"""Tools for automatically converting a Keras model to a Nengo network."""
import collections
import logging
import warnings
import nengo
import numpy as np
import tensorflow as tf
from tensorflow.python.keras.layers import BatchNormalization, BatchNormalizationV2
from tensorflow.python.util import nest
from nengo_dl.config import configure_settings
from nengo_dl.tensor_node import Layer, TensorNode
from nengo_dl.simulator import Simulator
logger = logging.getLogger(__name__)
[docs]class Converter:
"""
Converts a Keras model to a Nengo network composed of native Nengo objects.
Parameters
----------
model : ``tf.keras.Model``
Keras model to be converted
allow_fallback : bool
If True, allow layers that cannot be converted to native Nengo
objects to be added as a `.TensorNode` instead. Note that if this occurs, the
converted Nengo network will only be runnable in the NengoDL simulator.
inference_only : bool
Allow layers to be converted in such a way that inference behaviour will
match the source model but training behaviour will not.
If ``inference_only=False`` then some
layers cannot be converted to native Nengo objects (but you can
still use ``allow_fallback=True`` to use a `.TensorNode` instead).
max_to_avg_pool : bool
If True, convert max pooling layers to average pooling layers. Note that this
will change the behaviour of the network, so parameters will probably need to
be re-trained in NengoDL. If ``max_to_avg_pool=False`` then max pooling layers
cannot be converted to native Nengo objects (but you can
still use ``allow_fallback=True`` to use a `.TensorNode` instead).
split_shared_weights : bool
In Keras, applying the same ``Layer`` object to different input layers will
result in multiple instances of the given layer that share the same weights.
This is not supported in Nengo. If ``split_shared_weights=True`` then those
shared weights will be split into independent sets of weights. They will all
be initialized to the same value, so the initial behaviour of the model will
be unchanged, but if any further training is performed on the network then
the weights in each of those instances may diverge.
swap_activations : dict
A dictionary mapping from TensorFlow activation functions or Nengo neuron types
to TensorFlow activation functions or Nengo neuron types. This can be used to
change all the activation types in a model to some other type. This is in
addition to the default activation map (see `.LayerConverter`). It can be keyed
based on either TensorFlow or Nengo activation types, and will be applied both
before and after the default activation map, in order to support whatever swap
type is most useful for a given model. In particular, ``swap_activations``
can be useful for swapping rate neuron types to spiking neuron types,
through e.g. ``{tf.nn.relu: nengo.SpikingRectifiedLinear()}`` or
``{nengo.RectifiedLinear(): nengo.SpikingRectifiedLinear()}``. Or it can be
used to swap activation types that don't have a native Nengo implementation,
e.g. ``{tf.keras.activatons.elu: tf.keras.activations.relu}``.
Attributes
----------
model : ``tf.keras.Model``
The input Keras model (if input was a Sequential model then this will be the
equivalent Functional model).
net : `nengo.Network`
The converted Nengo network.
inputs : `.Converter.TensorDict`
Maps from Keras model inputs to input Nodes in the converted Nengo network.
For example, ``my_node = Converter(my_model).inputs[my_model.input]``.
outputs : `.Converter.TensorDict`
Maps from Keras model outputs to output Probes in the converted Nengo network.
For example, ``my_probe = Converter(my_model).outputs[my_model.output]``.
"""
converters = {}
def __init__(
self,
model,
allow_fallback=True,
inference_only=False,
max_to_avg_pool=False,
split_shared_weights=False,
swap_activations=None,
):
self.allow_fallback = allow_fallback
self.inference_only = inference_only
self.max_to_avg_pool = max_to_avg_pool
self.split_shared_weights = split_shared_weights
self.swap_activations = swap_activations or {}
self.layer_map = collections.defaultdict(dict)
self._layer_converters = {}
with nengo.Network(label=model.name) as self.net:
# add the "trainable" attribute to all objects
configure_settings(trainable=None, inference_only=self.inference_only)
# convert model
self.get_converter(model).convert(None)
if isinstance(model, tf.keras.Sequential):
# if someone passes a sequential model we convert it to a functional
# model and then convert that to a nengo model, so make the functional
# model accessible here
warnings.warn("Converting sequential model to functional model")
self.model = self.get_converter(model).layer
else:
self.model = model
# track inputs/outputs of model on network object
self.inputs = Converter.TensorDict()
for input in self.model.inputs:
(
input_layer,
input_node_id,
input_tensor_id,
) = LayerConverter.get_history(input)
self.inputs[input] = self.layer_map[input_layer][input_node_id][
input_tensor_id
]
self.outputs = Converter.TensorDict()
for output in self.model.outputs:
(
output_layer,
output_node_id,
output_tensor_id,
) = LayerConverter.get_history(output)
output_obj = self.layer_map[output_layer][output_node_id][
output_tensor_id
]
# add probes to outputs
logger.info("Probing %s (%s)", output_obj, output)
self.outputs[output] = nengo.Probe(output_obj)
[docs] def verify(self, training=False, inputs=None):
"""
Verify that output of converted Nengo network matches the original Keras model.
Parameters
----------
training : bool
If True, check that optimizing the converted Nengo network produces the same
results as optimizing the original Keras model.
inputs : list of `numpy.ndarray`
Testing values for model inputs (if not specified, array of ones will be
used).
Returns
-------
success : bool
True if output of Nengo network matches output of Keras model.
Raises
------
ValueError
If output of Nengo network does not match output of Keras model.
"""
epochs = 3
if inputs is None:
batch_size = 2
inp_vals = [np.ones((batch_size,) + x.shape[1:]) for x in self.model.inputs]
else:
batch_size = inputs[0].shape[0]
inp_vals = inputs
# get keras model output
if training:
out_vals = [
np.ones((batch_size,) + x.shape[1:]) for x in self.model.outputs
]
self.model.compile(optimizer=tf.optimizers.SGD(0.1), loss=tf.losses.mse)
self.model.fit(inp_vals, out_vals, epochs=epochs)
keras_out = self.model.predict(inp_vals)
if not isinstance(keras_out, (list, tuple)):
keras_out = [keras_out]
# get nengo sim output
inp_vals = [np.reshape(x, (batch_size, 1, -1)) for x in inp_vals]
with Simulator(self.net, minibatch_size=batch_size) as sim:
if training:
keras_params = sum(
np.prod(w.shape) for w in self.model.trainable_weights
)
nengo_params = sum(
np.prod(w.shape) for w in sim.keras_model.trainable_weights
)
if keras_params != nengo_params:
raise ValueError(
"Number of trainable parameters in Nengo network (%d) does not "
"match number of trainable parameters in Keras model (%d)"
% (nengo_params, keras_params)
)
out_vals = [np.reshape(x, (batch_size, 1, -1)) for x in out_vals]
sim.compile(optimizer=tf.optimizers.SGD(0.1), loss=tf.losses.mse)
sim.fit(inp_vals, out_vals, epochs=epochs)
sim_out = sim.predict(inp_vals)
for i, out in enumerate(self.model.outputs):
keras_vals = np.ravel(keras_out[i])
nengo_vals = np.ravel(sim_out[self.outputs[out]])
if not np.allclose(keras_vals, nengo_vals):
logger.info("Verification failure")
logger.info("Keras:\n%s", keras_vals)
logger.info("Nengo:\n%s", nengo_vals)
raise ValueError(
"Output of Keras model does not match output of converted "
"Nengo network"
)
return True
[docs] def get_converter(self, layer):
"""
Get instantiated `.LayerConverter` for the given ``Layer`` instance.
Note that this caches the results, so calling the function multiple times
with the same Layer instance will return the same LayerConverter instance.
Parameters
----------
layer : ``tf.keras.layers.Layer``
The Keras Layer being converted.
Returns
-------
converter : `.LayerConverter`
LayerConverter class for converting ``layer`` to Nengo objects.
"""
if layer in self._layer_converters:
# already have an instantiated converter for this layer
converter = self._layer_converters[layer]
if converter.has_weights and not self.split_shared_weights:
# TODO: allow fallback
raise ValueError(
"Multiple applications of layer %s detected; this is not supported "
"unless split_shared_weights=True" % layer
)
return converter
# check if there is a registered builder for this layer type
ConverterClass = self.converters.get(type(layer), None)
# perform custom checks in layer converters
if ConverterClass is None:
error_msg = "Layer type %s does not have a registered converter" % type(
layer
)
else:
convertible, error_msg = ConverterClass.convertible(layer, self)
if not convertible:
ConverterClass = None
if ConverterClass is None:
# this means that there is no LayerConverter compatible with this layer
# (either because it has an unknown type, or it failed the ``.convertible``
# check due to its internal parameterization)
if self.allow_fallback:
warnings.warn(
"%sFalling back to TensorNode."
% (error_msg + ". " if error_msg else "")
)
ConverterClass = self.converters[None]
else:
raise TypeError(
"%sUnable to convert layer %s to native Nengo objects; set "
"allow_fallback=True if you would like to use a TensorNode "
"instead, or consider registering a custom LayerConverter for this "
"layer type." % (error_msg + ". " if error_msg else "", layer.name)
)
converter = ConverterClass(layer, self)
self._layer_converters[layer] = converter
return converter
[docs] @classmethod
def register(cls, keras_layer):
"""
A decorator for adding a class to the converter registry.
Parameters
----------
keras_layer : ``tf.keras.layers.Layer``
The Layer associated with the conversion function being registered.
"""
def register_converter(convert_cls):
if keras_layer in cls.converters:
warnings.warn(
"Layer '%s' already has a converter. Overwriting." % keras_layer
)
cls.converters[keras_layer] = convert_cls
return convert_cls
return register_converter
[docs] class TensorDict:
"""A dictionary-like object that works with TensorFlow Tensors."""
def __init__(self):
self.dict = collections.OrderedDict()
def __setitem__(self, key, val):
if isinstance(key, tf.Tensor):
key = key.experimental_ref()
self.dict[key] = val
def __getitem__(self, key):
if isinstance(key, tf.Tensor):
key = key.experimental_ref()
return self.dict[key]
[docs]class LayerConverter:
"""
Base class for converter classes, which contain the logic for mapping some Keras
layer type to Nengo objects.
Subclasses must implement the `.LayerConverter.convert` method. They may optionally
extend `.LayerConverter.convertible` if this layer type requires custom logic for
whether or not a layer can be converted to Nengo objects.
Subclasses should override the ``unsupported_args`` class parameter if there are
certain non-default Layer attributes that are not supported by the converter.
This is a list of names for attributes that must have the default value for the
layer to be convertible. The default is assumed to be ``None``, or a tuple of
``("attribute_name", default_value)`` can be specified. If there are parameters
that are supported in inference mode but not in training mode, they should be
added to the ``unsupported_training_args`` parameter.
Subclasses should override the ``has_weights`` class parameter if the layer type
being converted contains internal weights (this affects how the converter will
handle duplicate layers).
Parameters
----------
layer : ``tf.keras.layers.Layer``
The Layer object being converted.
converter : `.Converter`
The parent Converter class running the conversion process.
"""
# maps from TensorFlow activation functions to Nengo neuron types
activation_map = {
None: None,
tf.keras.activations.linear: None,
tf.keras.activations.relu: nengo.RectifiedLinear(),
tf.nn.relu: nengo.RectifiedLinear(),
tf.keras.activations.sigmoid: nengo.Sigmoid(tau_ref=1),
tf.nn.sigmoid: nengo.Sigmoid(tau_ref=1),
}
# attributes of the Keras layer that are not supported for non-default values.
# the default value is assumed to be None, or a tuple of
# ("attr_name", default_value) can be specified
unsupported_args = []
# attributes that are supported in inference_only mode but otherwise not
unsupported_training_args = []
# whether or not this layer contains trainable weights (this indicates whether
# this layer is affected by split_shared_weights)
has_weights = False
def __init__(self, layer, converter):
self.layer = layer
self.converter = converter
[docs] def add_nengo_obj(self, node_id, biases=None, activation=None):
"""
Builds a Nengo object for the given Node of this layer.
Parameters
----------
node_id : int
The index of the Keras Node currently being built on this layer.
biases : `numpy.ndarray` or None
If not None, add trainable biases with the given value.
activation : callable or None
The TensorFlow activation function to be used (``None`` will be
interpreted as linear activation).
Returns
-------
obj : `nengo.Node` or `nengo.ensemble.Neurons` or `nengo_dl.TensorNode`
The Nengo object whose output corresponds to the output of the given Keras
Node.
"""
name = self.layer.name + ".%d" % node_id
# apply manually specified swaps
activation = self.converter.swap_activations.get(activation, activation)
if activation in self.activation_map or isinstance(
activation, nengo.neurons.NeuronType
):
activation = self.activation_map.get(activation, activation)
# apply any nengo->nengo swaps
activation = self.converter.swap_activations.get(activation, activation)
if activation is None:
# linear activation, uses a passthrough Node
obj = nengo.Node(
size_in=np.prod(self.output_shape(node_id)), label=name,
)
if biases is not None:
# use a connection from a constant node (so that the bias
# values will be trainable)
bias_node = nengo.Node([1], label="%s.bias_node" % name)
nengo.Connection(
bias_node, obj, transform=biases[:, None], synapse=None
)
else:
# use ensemble to implement the appropriate neuron type
obj = nengo.Ensemble(
np.prod(self.output_shape(node_id)),
1,
neuron_type=activation,
gain=nengo.dists.Choice([1]),
bias=nengo.dists.Choice([0]) if biases is None else biases,
label=name,
).neurons
if biases is None:
# ensembles always have biases, so if biases=None we just use
# all-zero biases and mark them as non-trainable
self.converter.net.config[obj].trainable = False
elif self.converter.allow_fallback:
warnings.warn(
"Activation type %s does not have a native Nengo equivalent; "
"falling back to a TensorNode" % activation
)
obj = TensorNode(
activation,
shape_in=self.input_shape(node_id),
pass_time=False,
label=name,
)
else:
raise TypeError("Unsupported activation type (%s)" % self.layer.activation)
logger.info("Created %s (size=%d)", obj, obj.size_out)
return obj
[docs] def add_connection(self, node_id, obj, input_idx=0, trainable=False, **kwargs):
"""
Adds a Connection from one of the inputs of the Node being built to the
Nengo object.
Parameters
----------
node_id : int
The index of the Keras Node currently being built on this layer.
obj : ``NengoObject``
The Nengo object implementing this Node.
input_idx : int
Which of the inputs we want to add a Connection for (in the case of
layers that have multiple inputs).
trainable : bool
Whether or not the weights associated with the created Connection
should be trainable.
kwargs : dict
Will be passed on to `nengo.Connection`.
Returns
-------
conn : `nengo.Connection`
The constructed Connection object.
"""
conn = nengo.Connection(
self.get_input_obj(node_id, tensor_idx=input_idx),
obj,
synapse=None,
**kwargs,
)
self.converter.net.config[conn].trainable = trainable
logger.info(
"Connected %s to %s (trainable=%s)", conn.pre, conn.post, trainable,
)
return conn
def _get_shape(self, input_output, node_id, include_batch=False):
"""
Looks up the input or output shape of this Node.
Parameters
----------
input_output : "input" or "output"
Whether we want the input or output shape.
node_id : int
The node whose shape we want to look up.
include_batch : bool
Whether or not the returned shape should include the batch dimension.
Returns
-------
shape : (list of) tuple of int
A single tuple shape if the node has one input/output, or a list of shapes
if the node as multiple inputs/outputs.
"""
# note: layer.get_input/output_shape_at is generally equivalent to
# layer.input/output_shape, except when the layer is called multiple times
# with different shapes, in which case input/output_shape is not well defined
func = getattr(self.layer, "get_%s_shape_at" % input_output)
# get the shape
shape = func(node_id)
if not include_batch:
if isinstance(shape, list):
# multiple inputs/outputs; trim the batch from each one
shape = [s[1:] for s in shape]
else:
shape = shape[1:]
return shape
[docs] def output_shape(self, node_id, include_batch=False):
"""
Returns the output shape of the given node.
Parameters
----------
node_id : int
The node whose shape we want to look up.
include_batch : bool
Whether or not the returned shape should include the batch dimension.
Returns
-------
shape : (list of) tuple of int
A single tuple shape if the node has one output, or a list of shapes
if the node as multiple outputs.
"""
return self._get_shape("output", node_id, include_batch=include_batch)
[docs] @staticmethod
def get_history(tensor):
"""
Returns the Keras history (layer/node_idx/tensor_idx) that defined this tensor.
This function contains additional logic so that if ``tensor`` is the output of
a Model then the history will trace into the internal layers of that Model
(rather than skipping to the input of that Model, which is the default Keras
history).
Parameters
----------
tensor : ``tf.Tensor``
The tensor whose Keras history we want to look up.
Returns
-------
layer : ``tf.keras.layers.Layer``
The Layer object that created this Tensor.
node_index : int
The index of the outbound node of ``layer`` that created this Tensor.
tensor_index : int
The index in the output of the Node corresponding to this Tensor (for
Nodes with multiple outputs).
"""
layer, node_index, tensor_index = tensor._keras_history
while isinstance(layer, tf.keras.Model):
# models have an output Identity transform that stores the history that
# "skips" the internals of the model; we want to traverse into the internals
# of the model, so we go back to the input of that identity op (which
# is the real output tensor from the model)
assert tensor.op.type == "Identity"
tensor = tensor.op.inputs[0]
layer, node_index, tensor_index = tensor._keras_history
return layer, node_index, tensor_index
[docs] @classmethod
def convertible(cls, layer, converter):
"""
Check whether the given Keras layer is convertible to native Nengo objects.
Parameters
----------
layer : ``tf.keras.layers.Layer``
The Keras Layer we want to convert.
converter : `.Converter`
The Converter object running the conversion process.
Returns
-------
convertible : bool
True if the layer can be converted to native Nengo objects, else False.
"""
# check if the layer uses any unsupported arguments
unsupported = cls.unsupported_args
if not converter.inference_only:
unsupported = unsupported + cls.unsupported_training_args
for arg in unsupported:
if isinstance(arg, str):
default = None
else:
arg, default = arg
val = getattr(layer, arg)
if val != default:
msg = "%s.%s has value %s != %s, which is not supported" % (
layer.name,
arg,
val,
default,
)
if arg in cls.unsupported_training_args:
msg += " (unless inference_only=True)"
return False, msg
return True, None
[docs] def convert(self, node_id):
"""
Convert the given node of this layer to Nengo objects
Parameters
----------
node_id : int
The index of the inbound node to be converted.
Returns
-------
output : ``NengoObject``
Nengo object whose output corresponds to the output of the Keras layer node.
"""
raise NotImplementedError("Subclasses must implement convert")
[docs]@Converter.register(tf.keras.Model)
class ConvertModel(LayerConverter):
"""Convert ``tf.keras.Model`` to Nengo objects."""
[docs] def convert(self, node_id):
# should never be building a model except in the top-level converter
assert node_id is None
logger.info("=" * 30)
logger.info("Converting model %s", self.layer.name)
# functional models should already have been built when the model
# was instantiated
assert self.layer.built
# trace the model to find all the tensors (which correspond to layers/nodes)
# that need to be built into the Nengo network
source_tensors = self.trace_tensors(self.layer.outputs)
def sort_key(x):
# sort tensors so that order of model inputs/outputs is preserved
for i, y in enumerate(self.layer.inputs):
if x is y:
return -(len(self.layer.inputs) - i)
for i, y in enumerate(self.layer.outputs):
if x is y:
return i + 1
return 0
source_tensors = sorted(source_tensors, key=sort_key)
for tensor in source_tensors:
# look up the layer/node to be converted
model_layer, model_node_id, _ = self.get_history(tensor)
if model_node_id in self.converter.layer_map[model_layer]:
# already built this node
continue
logger.info("-" * 30)
logger.info("Converting layer %s node %d", model_layer.name, model_node_id)
# get the layerconverter object
layer_converter = self.converter.get_converter(model_layer)
# build the Nengo objects
nengo_layer = layer_converter.convert(model_node_id)
assert isinstance(
nengo_layer, (nengo.Node, nengo.ensemble.Neurons, TensorNode),
)
# add output of layer_converter to layer_map
self.converter.layer_map[model_layer][model_node_id] = [nengo_layer]
logger.info("=" * 30)
# note: not returning anything, because we don't need to store anything in
# the layer map (this network is only used by the top-level converter class)
[docs] def trace_tensors(self, tensors, results=None):
"""
Recursively trace all the upstream layer tensors, starting from ``tensors``.
Parameters
----------
tensors : list of ``tf.Tensor``
Tensors representing the output of some layers.
results : list of ``tf.Tensor``
Output tensors for all the layers leading up to and including ``tensors``.
This will be populated in-place during the recursive execution.
Returns
-------
results : list of ``tf.Tensor``
The same as the ``results`` parameter (returned so that the top-level call,
which may not have a reference to the ``results`` list can get the results).
"""
# brief intro to the keras functional graph structure:
# - a node represents the application of some layer to an input tensor
# - whenever a layer B is applied to the output of layer A a new Node is
# created; this Node is added to A.outbound_nodes and B.inbound_nodes
# - every Node has input tensors x (which will be the input to B) and output
# tensors y (the output of B)
# - every tensor tracks the layer/node that created it in _keras_history
# (so the _keras_history of y would be (B, 0) (where 0 is the index within
# B.inbound_nodes corresponding to the node that created y); note that x was
# created whenever A was applied to some other layer, so its _keras_history is
# unrelated to the application of B
# for example, if we apply multiple layers B/C/D to the output of some
# layer A:
# b = B(a)
# c = C(a)
# d = D(a)
# each time will create a new Node. so we will have 3 nodes total;
# A will have 3 outbound nodes, and B/C/D will each have one inbound node.
# every node will have the same input tensor (a), but a different
# output tensor (the output of B/C/D) with keras_history (B, 0), (C, 0), and
# (D, 0).
# on the other hand, if we take one layer and apply it to multiple inputs:
# d0 = D(a)
# d1 = D(b)
# d2 = D(c)
# again we will have 3 nodes total. D will 3 inbound nodes, and A/B/C will each
# have one outbound node. each node will have a different input tensor (the
# output of A/B/C), but _also a different output tensor_ (the result of applying
# D to each one of those inputs) with keras history (D, 0), (D, 1), (D, 2)
if results is None:
results = []
logger.debug("===starting trace_tensors===")
logger.debug("Tracing tensors %s", tensors)
for tensor in tensors:
if any(tensor is y for y in results):
# already traced this tensor
continue
layer, node_index, _ = self.get_history(tensor)
logger.debug("---")
logger.debug("Layer %s node %s", layer.name, node_index)
if layer.inbound_nodes:
node = layer.inbound_nodes[node_index]
if node.inbound_layers:
logger.debug("Input layers %s", node.inbound_layers)
logger.debug("Input tensors %s", node.input_tensors)
# not an input layer, so continue recursion
self.trace_tensors(
nest.flatten(node.input_tensors), results=results
)
results.append(tensor)
logger.debug("===done trace_tensors===")
return results
[docs]@Converter.register(tf.keras.Sequential)
class ConvertSequential(ConvertModel):
"""Convert ``tf.keras.Sequential`` to Nengo objects."""
def __init__(self, seq_model, converter):
# convert sequential model to functional model
input_shape = seq_model.layers[0].input_shape
inp = x = tf.keras.Input(batch_shape=input_shape)
for layer in seq_model.layers:
x = layer(x)
func_model = tf.keras.Model(inp, x)
super().__init__(func_model, converter)
[docs]@Converter.register(None)
class ConvertFallback(LayerConverter):
"""
Convert layers which do not have a native Nengo equivalent into a
`.TensorNode`.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# copy layer, so that any changes to the layer (e.g., rebuilding it), do not
# affect the source model
layer = self.layer.__class__.from_config(self.layer.get_config())
if self.layer.built:
layer.build(self.input_shape(0, include_batch=True))
layer.set_weights(self.layer.get_weights())
self.tensor_layer = Layer(layer)
[docs] def convert(self, node_id):
logger.info("Using TensorNode %s", self.tensor_layer)
input_obj = self.get_input_obj(node_id)
output = self.tensor_layer(
input_obj, shape_in=self.input_shape(node_id), label=self.layer.name
)
logger.info("Applying to %s, created %s", input_obj, output)
return output
[docs]class ConvertAvgPool(LayerConverter):
"""Base class for converting average pooling layers to Nengo objects."""
# "same" padding not supported because TensorFlow average pooling does not count
# the padded cells in the average, which we don't have a way to do using Convolution
unsupported_args = [("padding", "valid")]
[docs] def convert(self, node_id, dimensions):
output = self.add_nengo_obj(node_id)
def to_tuple(val):
if isinstance(val, int):
return (val,) * dimensions
return val
spatial_shape = (
self.input_shape(node_id)[:-1]
if self.layer.data_format == "channels_last"
else self.input_shape(node_id)[1:]
)
# the default values here are for GlobalAveragePooling (which doesn't have
# these attributes)
pool_size = to_tuple(getattr(self.layer, "pool_size", spatial_shape))
padding = getattr(self.layer, "padding", "valid")
strides = to_tuple(getattr(self.layer, "strides", 1))
# the idea here is that we set up a convolutional transform with weights 1/n,
# which will have the effect of implementing average pooling
n_filters = (
self.output_shape(node_id)[-1]
if self.layer.data_format == "channels_last"
else self.output_shape(node_id)[0]
)
n_pool = np.prod(pool_size)
kernel = np.reshape(
[np.eye(n_filters) / n_pool] * n_pool, pool_size + (n_filters, n_filters),
)
pool_conv = nengo.Convolution(
n_filters=n_filters,
input_shape=self.input_shape(node_id),
padding=padding,
strides=strides,
kernel_size=pool_size,
init=kernel,
channels_last=self.layer.data_format == "channels_last",
)
self.add_connection(node_id, output, transform=pool_conv)
return output
[docs] @classmethod
def convertible(cls, layer, converter):
if (
isinstance(
layer,
(
tf.keras.layers.MaxPool1D,
tf.keras.layers.MaxPool2D,
tf.keras.layers.MaxPool3D,
tf.keras.layers.GlobalMaxPool1D,
tf.keras.layers.GlobalMaxPool2D,
tf.keras.layers.GlobalMaxPool3D,
),
)
and not converter.max_to_avg_pool
):
msg = (
"Cannot convert max pooling layers to native Nengo objects; consider "
"setting max_to_avg_pool=True to use average pooling instead"
)
return False, msg
unsupported = cls.unsupported_args
if not hasattr(layer, "padding"):
# global layers don't have this attribute, so we temporarily remove it
# from the unsupported args
cls.unsupported_args = []
try:
convertible = super().convertible(layer, converter)
finally:
# reset the unsupported args
cls.unsupported_args = unsupported
return convertible
[docs]@Converter.register(tf.keras.layers.Activation)
class ConvertActivation(LayerConverter):
"""Convert ``tf.keras.layers.Activation`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id, activation=self.layer.activation)
self.add_connection(node_id, output)
return output
[docs]@Converter.register(tf.keras.layers.Add)
class ConvertAdd(LayerConverter):
"""Convert ``tf.keras.layers.Add`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id)
for i in range(len(self.layer.input)):
self.add_connection(node_id, output, input_idx=i)
return output
[docs]@Converter.register(tf.keras.layers.Average)
class ConvertAverage(LayerConverter):
"""Convert ``tf.keras.layers.Average`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id)
for i in range(len(self.layer.input)):
self.add_connection(
node_id, output, input_idx=i, transform=1 / len(self.layer.input)
)
return output
[docs]@Converter.register(tf.keras.layers.AvgPool1D)
@Converter.register(tf.keras.layers.MaxPool1D)
@Converter.register(tf.keras.layers.GlobalAvgPool1D)
@Converter.register(tf.keras.layers.GlobalMaxPool1D)
class ConvertAvgPool1D(ConvertAvgPool):
"""
Convert ``tf.keras.layers.AvgPool1D`` to Nengo objects.
Also works for ``tf.keras.layers.GlobalAvgPool1D``, and
``tf.keras.layers.MaxPool1D``/``GlobalMaxPool1D`` (if ``max_to_avg_pool=True``).
"""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=1)
[docs]@Converter.register(tf.keras.layers.AvgPool2D)
@Converter.register(tf.keras.layers.MaxPool2D)
@Converter.register(tf.keras.layers.GlobalAvgPool2D)
@Converter.register(tf.keras.layers.GlobalMaxPool2D)
class ConvertAvgPool2D(ConvertAvgPool):
"""
Convert ``tf.keras.layers.AvgPool2D`` to Nengo objects.
Also works for ``tf.keras.layers.GlobalAvgPool2D``, and
``tf.keras.layers.MaxPool2D``/``GlobalMaxPool2D`` (if ``max_to_avg_pool=True``).
"""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=2)
[docs]@Converter.register(tf.keras.layers.AvgPool3D)
@Converter.register(tf.keras.layers.MaxPool3D)
@Converter.register(tf.keras.layers.GlobalAvgPool3D)
@Converter.register(tf.keras.layers.GlobalMaxPool3D)
class ConvertAvgPool3D(ConvertAvgPool):
"""
Convert ``tf.keras.layers.AvgPool3D`` to Nengo objects.
Also works for ``tf.keras.layers.GlobalAvgPool3D``, and
``tf.keras.layers.MaxPool3D``/``GlobalMaxPool3D`` (if ``max_to_avg_pool=True``).
"""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=3)
[docs]@Converter.register(BatchNormalization)
@Converter.register(BatchNormalizationV2)
class ConvertBatchNormalization(LayerConverter):
"""Convert ``tf.keras.layers.BatchNormalization`` to Nengo objects."""
[docs] def convert(self, node_id):
# look up the batch normalization parameters
if self.layer.scale:
gamma = tf.keras.backend.get_value(self.layer.gamma)
else:
gamma = 1
if self.layer.center:
beta = tf.keras.backend.get_value(self.layer.beta)
else:
beta = 0
mean, variance = tf.keras.backend.batch_get_value(
(self.layer.moving_mean, self.layer.moving_variance)
)
# compute the fixed affine transform values for this layer
variance += self.layer.epsilon
stddev = np.sqrt(variance)
scale = gamma / stddev
bias = beta - gamma * mean / stddev
# build output object
output = self.add_nengo_obj(node_id)
# the batch normalization parameters will be n-dimensional, where n is the
# length of the axis specified in the batch normalization layer. so we need
# to set up a connection structure so that all the elements of the output
# corresponding to one of those axis elements will share the same parameter
assert len(self.layer.axis) == 1
assert self.layer.axis[0] > 0
axis = self.layer.axis[0] - 1 # not counting batch dimension
idxs = np.arange(output.size_in).reshape(self.output_shape(node_id))
slices = [slice(None) for _ in range(len(idxs.shape))]
# broadcast scale/bias along the non-axis dimensions, so that we can apply the
# same scale/bias to all those elements
broadcast_scale = np.zeros(self.output_shape(node_id))
broadcast_bias = np.zeros(self.output_shape(node_id))
for i in range(idxs.shape[axis]):
slices[axis] = i
broadcast_scale[slices] = scale[i]
broadcast_bias[slices] = bias[i]
broadcast_scale = np.ravel(broadcast_scale)
broadcast_bias = np.ravel(broadcast_bias)
# connect up bias node to output
bias_node = nengo.Node(broadcast_bias)
conn = nengo.Connection(bias_node, output, synapse=None)
self.converter.net.config[conn].trainable = False
# connect input to output, scaled by the batch normalization scale
conn = nengo.Connection(
self.get_input_obj(node_id),
output,
synapse=None,
transform=broadcast_scale,
)
self.converter.net.config[conn].trainable = False
# this is an alternate approach, where rather than broadcasting scale/bias,
# we create individual connections for each element in the batch normalization
# axis. this will result in smaller weight matrices, but more Connections
# TODO: figure out where the tradeoffs lie between these two approaches
# bias_node = nengo.Node(np.ones(idxs[slices].size))
#
# # for each element in the batch normalization axis
# for i in range(idxs.shape[axis]):
# # slice out one element of the output along the axis
# slices[axis] = i
# slice_idxs = np.ravel(idxs[slices])
# sliced_output = output[slice_idxs]
#
# # connect up bias
# conn = nengo.Connection(
# bias_node, sliced_output, synapse=None, transform=bias[i],
# )
# self.converter.net.config[conn].trainable = False
#
# # connect up input with scale applied
# conn = nengo.Connection(
# self.get_input_obj(node_id)[slice_idxs],
# sliced_output,
# synapse=None,
# transform=scale[i],
# )
# self.converter.net.config[conn].trainable = False
return output
[docs] @classmethod
def convertible(cls, layer, converter):
if not converter.inference_only:
msg = (
"Cannot convert BatchNormalization layer to native Nengo objects "
"unless inference_only=True"
)
return False, msg
return super().convertible(layer, converter)
[docs]@Converter.register(tf.keras.layers.Concatenate)
class ConvertConcatenate(LayerConverter):
"""Convert ``tf.keras.layers.Concatenate`` to Nengo objects."""
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id)
# axis-1 because not counting batch dimension
axis = self.layer.axis - 1 if self.layer.axis > 0 else self.layer.axis
idxs = np.arange(np.prod(self.output_shape(node_id))).reshape(
self.output_shape(node_id)
)
slices = [slice(None) for _ in range(idxs.ndim)]
offsets = np.cumsum([shape[axis] for shape in self.input_shape(node_id)])
offsets = np.concatenate(([0], offsets))
for i in range(len(self.layer.input)):
slices[axis] = slice(offsets[i], offsets[i + 1])
self.add_connection(
node_id, output[np.ravel(idxs[tuple(slices)])], input_idx=i
)
return output
[docs] @classmethod
def convertible(cls, layer, converter):
if layer.axis == 0:
msg = "Cannot concatenate along batch dimension (axis 0)"
return False, msg
return super().convertible(layer, converter)
[docs]class ConvertConv(LayerConverter):
"""Base class for converting convolutional layers to Nengo objects."""
has_weights = True
[docs] def convert(self, node_id, dimensions):
# look up parameter values from source layer
if self.layer.use_bias:
kernel, biases = tf.keras.backend.batch_get_value(
(self.layer.kernel, self.layer.bias)
)
else:
kernel = tf.keras.backend.get_value(self.layer.kernel)
biases = None
# create nengo object to implement activation function
output = self.add_nengo_obj(node_id, activation=self.layer.activation)
if self.layer.use_bias:
# conv layer biases are per-output-channel, rather than per-output-element,
# so we need to set up a nengo connection structure that will have one
# bias parameter shared across all the spatial dimensions
if self.layer.data_format == "channels_first":
spatial_size = np.prod(self.output_shape(node_id)[1:])
bias_node = nengo.Node(np.ones(spatial_size), label="conv_bias")
offset = 0
for i in range(self.output_shape(node_id)[0]):
nengo.Connection(
bias_node,
output[offset : offset + spatial_size],
transform=biases[i],
synapse=None,
)
offset += spatial_size
else:
spatial_size = np.prod(self.output_shape(node_id)[:-1])
bias_node = nengo.Node(np.ones(spatial_size), label="conv_bias")
idxs = np.arange(np.prod(self.output_shape(node_id))).reshape(
(-1, self.output_shape(node_id)[-1])
)
for i in range(self.output_shape(node_id)[-1]):
nengo.Connection(
bias_node,
output[idxs[:, i]],
transform=biases[i],
synapse=None,
)
# set up a convolutional transform that matches the layer parameters
transform = nengo.Convolution(
n_filters=self.layer.filters,
input_shape=self.input_shape(node_id),
kernel_size=self.layer.kernel_size,
strides=self.layer.strides,
padding=self.layer.padding,
channels_last=self.layer.data_format == "channels_last",
init=kernel,
)
self.add_connection(node_id, output, transform=transform, trainable=True)
return output
[docs]@Converter.register(tf.keras.layers.Conv1D,)
class ConvertConv1D(ConvertConv):
"""Convert ``tf.keras.layers.Conv1D`` to Nengo objects."""
unsupported_args = [
("dilation_rate", (1,)),
]
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=1)
[docs]@Converter.register(tf.keras.layers.Conv2D,)
class ConvertConv2D(ConvertConv):
"""Convert ``tf.keras.layers.Conv2D`` to Nengo objects."""
unsupported_args = [
("dilation_rate", (1, 1)),
]
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=2)
[docs]@Converter.register(tf.keras.layers.Conv3D,)
class ConvertConv3D(ConvertConv):
"""Convert ``tf.keras.layers.Conv3D`` to Nengo objects."""
unsupported_args = [
("dilation_rate", (1, 1, 1)),
]
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=3)
[docs]@Converter.register(tf.keras.layers.Dense)
class ConvertDense(LayerConverter):
"""Convert ``tf.keras.layers.Dense`` to Nengo objects."""
unsupported_training_args = [
"kernel_regularizer",
"bias_regularizer",
"activity_regularizer",
"kernel_constraint",
"bias_constraint",
]
has_weights = True
[docs] def convert(self, node_id):
# look up parameter values from source layer
if self.layer.use_bias:
weights, biases = tf.keras.backend.batch_get_value(
(self.layer.kernel, self.layer.bias)
)
else:
weights = tf.keras.backend.get_value(self.layer.kernel)
biases = None
# create nengo object to implement activation function and biases
output = self.add_nengo_obj(
node_id, activation=self.layer.activation, biases=biases
)
# add connection to implement the dense weights
self.add_connection(node_id, output, transform=weights.T, trainable=True)
return output
[docs]@Converter.register(tf.keras.layers.Flatten)
class ConvertFlatten(LayerConverter):
"""Convert ``tf.keras.layers.Flatten`` to Nengo objects."""
[docs] def convert(self, node_id):
# noop, same as reshape
return self.get_input_obj(node_id)
[docs]@Converter.register(tf.keras.layers.ReLU)
class ConvertReLU(LayerConverter):
"""Convert ``tf.keras.layers.ReLU`` to Nengo objects."""
unsupported_args = [("negative_slope", 0), "max_value", ("threshold", 0)]
[docs] def convert(self, node_id):
output = self.add_nengo_obj(node_id, biases=None, activation=tf.nn.relu)
self.add_connection(node_id, output)
return output
[docs]@Converter.register(tf.keras.layers.Reshape)
class ConvertReshape(LayerConverter):
"""Convert ``tf.keras.layers.Reshape`` to Nengo objects."""
[docs] def convert(self, node_id):
# nengo doesn't pass shape information between objects (everything is just a
# vector), so we don't actually need to do anything here, we just return
# the input layer. layers that require shape information can look it up from
# the input_shape attribute of their layer
return self.get_input_obj(node_id)
[docs]class ConvertZeroPadding(LayerConverter):
"""Base class for converting zero-padding layers to Nengo objects."""
[docs] def convert(self, node_id, dimensions):
output = self.add_nengo_obj(node_id)
# zeropadding1d doesn't doesn't have data_format, assumes channels_last
channels_first = (
getattr(self.layer, "data_format", "channels_last") == "channels_first"
)
# the strategy here is that we'll create a nengo node of the full padded size,
# and then connect up the input to the subset of those node elements
# corresponding to the inner, non-padded elements. so we need to figure out
# what the indices are that we need to connect to.
# build slices representing the non-padded elements within the output shape
slices = []
if channels_first:
slices.append(slice(None))
for i in range(dimensions):
if dimensions == 1:
top_pad, bottom_pad = self.layer.padding
else:
top_pad, bottom_pad = self.layer.padding[i]
length = self.output_shape(node_id)[i + channels_first]
slices.append(slice(top_pad, length - bottom_pad))
# apply slices to index array to get the list of indices we want to connect to
idxs = np.arange(output.size_in).reshape(self.output_shape(node_id))
idxs = np.ravel(idxs[tuple(slices)])
# connect up the input to the appropriate indices
self.add_connection(node_id, output[idxs])
return output
[docs]@Converter.register(tf.keras.layers.ZeroPadding1D)
class ConvertZeroPadding1D(ConvertZeroPadding):
"""Convert ``tf.keras.layers.ZeroPadding1D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=1)
[docs]@Converter.register(tf.keras.layers.ZeroPadding2D)
class ConvertZeroPadding2D(ConvertZeroPadding):
"""Convert ``tf.keras.layers.ZeroPadding2D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=2)
[docs]@Converter.register(tf.keras.layers.ZeroPadding3D)
class ConvertZeroPadding3D(ConvertZeroPadding):
"""Convert ``tf.keras.layers.ZeroPadding3D`` to Nengo objects."""
[docs] def convert(self, node_id):
return super().convert(node_id, dimensions=3)