Source code for nengo_extras.convnet

import numpy as np

from nengo.processes import Process
from nengo.params import EnumParam, NdarrayParam, NumberParam, ShapeParam
from nengo.utils.compat import is_iterable, range


def softmax(x, axis=None):
    """Stable softmax function"""
    ex = np.exp(x - x.max(axis=axis, keepdims=True))
    return ex / ex.sum(axis=axis, keepdims=True)


[docs]class Conv2d(Process): """Perform 2-D (image) convolution on an input. Parameters ---------- shape_in : 3-tuple (n_channels, height, width) Shape of the input images: channels, height, width. filters : array_like (n_filters, n_channels, f_height, f_width) Static filters to convolve with the input. Shape is number of filters, number of input channels, filter height, and filter width. Shape can also be (n_filters, height, width, n_channels, f_height, f_width) to apply different filters at each point in the image, where 'height' and 'width' are the input image height and width. biases : array_like (1,) or (n_filters,) or (n_filters, height, width) Biases to add to outputs. Can have one bias across the entire output space, one bias per filter, or a unique bias for each output pixel. strides : 2-tuple (vertical, horizontal) or int Spacing between filter placements. If an integer is provided, the same spacing is used in both dimensions. padding : 2-tuple (vertical, horizontal) or int Amount of zero-padding around the outside of the input image. Padding is applied to both sides, e.g. ``padding=(1, 0)`` will add one pixel of padding to the top and bottom, and none to the left and right. """ shape_in = ShapeParam('shape_in', length=3, low=1) shape_out = ShapeParam('shape_out', length=3, low=1) strides = ShapeParam('strides', length=2, low=1) padding = ShapeParam('padding', length=2) filters = NdarrayParam('filters', shape=('...',)) biases = NdarrayParam('biases', shape=('...',), optional=True) border = EnumParam('border', values=('floor', 'ceil')) def __init__(self, shape_in, filters, biases=None, strides=1, padding=0, border='ceil'): # noqa: C901 self.shape_in = shape_in self.filters = filters if self.filters.ndim not in [4, 6]: raise ValueError( "`filters` must have four or six dimensions " "(filters, [height, width,] channels, f_height, f_width)") if self.filters.shape[-3] != self.shape_in[0]: raise ValueError( "Filter channels (%d) and input channels (%d) must match" % (self.filters.shape[-3], self.shape_in[0])) if not all(s % 2 == 1 for s in self.filters.shape[-2:]): raise ValueError("Filter shapes must be odd (got %r)" % (self.filters.shape[-2:],)) self.strides = strides if is_iterable(strides) else [strides] * 2 self.padding = padding if is_iterable(padding) else [padding] * 2 self.border = border nf = self.filters.shape[0] nxi, nxj = self.shape_in[1:] si, sj = self.filters.shape[-2:] pi, pj = self.padding sti, stj = self.strides rounder = np.ceil if self.border == 'ceil' else np.floor nyi = 1 + max(int(rounder(float(2*pi + nxi - si) / sti)), 0) nyj = 1 + max(int(rounder(float(2*pj + nxj - sj) / stj)), 0) self.shape_out = (nf, nyi, nyj) if self.filters.ndim == 6 and self.filters.shape[1:3] != (nyi, nyj): raise ValueError("Number of local filters %r must match out shape " "%r" % (self.filters.shape[1:3], (nyi, nyj))) self.biases = biases if biases is not None else None if self.biases is not None: if self.biases.size == 1: self.biases.shape = (1, 1, 1) elif self.biases.size == np.prod(self.shape_out): self.biases.shape = self.shape_out elif self.biases.size == self.shape_out[0]: self.biases.shape = (self.shape_out[0], 1, 1) elif self.biases.size == np.prod(self.shape_out[1:]): self.biases.shape = (1,) + self.shape_out[1:] else: raise ValueError( "Biases size (%d) does not match output shape %s" % (self.biases.size, self.shape_out)) super(Conv2d, self).__init__( default_size_in=np.prod(self.shape_in), default_size_out=np.prod(self.shape_out)) def make_step(self, shape_in, shape_out, dt, rng): assert np.prod(shape_in) == np.prod(self.shape_in) assert np.prod(shape_out) == np.prod(self.shape_out) shape_in, shape_out = self.shape_in, self.shape_out filters = self.filters local_filters = filters.ndim == 6 biases = self.biases nc, nxi, nxj = shape_in nf, nyi, nyj = shape_out si, sj = filters.shape[-2:] pi, pj = self.padding sti, stj = self.strides def step_conv2d(t, x): x = x.reshape(-1, nc, nxi, nxj) n = x.shape[0] y = np.zeros((n, nf, nyi, nyj), dtype=x.dtype) for i in range(nyi): for j in range(nyj): i0 = i*sti - pi j0 = j*stj - pj i1, j1 = i0 + si, j0 + sj sli = slice(max(-i0, 0), min(nxi + si - i1, si)) slj = slice(max(-j0, 0), min(nxj + sj - j1, sj)) w = (filters[:, i, j, :, sli, slj] if local_filters else filters[:, :, sli, slj]) xij = x[:, :, max(i0, 0):min(i1, nxi), max(j0, 0):min(j1, nxj)] y[:, :, i, j] = np.dot( xij.reshape(n, -1), w.reshape(nf, -1).T) if biases is not None: y += biases return y.ravel() return step_conv2d
[docs]class Pool2d(Process): """Perform 2-D (image) pooling on an input. Parameters ---------- shape_in : 3-tuple (channels, height, width) Shape of the input image. pool_size : 2-tuple (vertical, horizontal) or int Shape of the pooling region. If an integer is provided, the shape will be square with the given side length. strides : 2-tuple (vertical, horizontal) or int Spacing between pooling placements. If ``None`` (default), will be equal to ``pool_size`` resulting in non-overlapping pooling. kind : "avg" or "max" Type of pooling to perform: average pooling or max pooling. mode : "full" or "valid" If the input image does not divide into an integer number of pooling regions, whether to add partial pooling regions for the extra pixels ("full"), or discard extra input pixels ("valid"). Attributes ---------- shape_out : 3-tuple (channels, height, width) Shape of the output image. """ shape_in = ShapeParam('shape_in', length=3, low=1) shape_out = ShapeParam('shape_out', length=3, low=1) pool_size = ShapeParam('pool_size', length=2, low=1) strides = ShapeParam('strides', length=2, low=1) kind = EnumParam('kind', values=('avg', 'max')) mode = EnumParam('mode', values=('full', 'valid')) def __init__(self, shape_in, pool_size, strides=None, kind='avg', mode='full'): self.shape_in = shape_in self.pool_size = (pool_size if is_iterable(pool_size) else [pool_size] * 2) self.strides = (strides if is_iterable(strides) else [strides] * 2 if strides is not None else self.pool_size) self.kind = kind self.mode = mode if not all(st <= p for st, p in zip(self.strides, self.pool_size)): raise ValueError("Strides %s must be <= pool_size %s" % (self.strides, self.pool_size)) nc, nxi, nxj = self.shape_in nyi_float = float(nxi - self.pool_size[0]) / self.strides[0] nyj_float = float(nxj - self.pool_size[1]) / self.strides[1] if self.mode == 'full': nyi = 1 + int(np.ceil(nyi_float)) nyj = 1 + int(np.ceil(nyj_float)) elif self.mode == 'valid': nyi = 1 + int(np.floor(nyi_float)) nyj = 1 + int(np.floor(nyj_float)) self.shape_out = (nc, nyi, nyj) super(Pool2d, self).__init__( default_size_in=np.prod(self.shape_in), default_size_out=np.prod(self.shape_out)) def make_step(self, shape_in, shape_out, dt, rng): assert np.prod(shape_in) == np.prod(self.shape_in) assert np.prod(shape_out) == np.prod(self.shape_out) nc, nxi, nxj = self.shape_in nc, nyi, nyj = self.shape_out si, sj = self.pool_size sti, stj = self.strides kind = self.kind nxi2, nxj2 = nyi * sti, nyj * stj def step_pool2d(t, x): x = x.reshape(-1, nc, nxi, nxj) y = np.zeros((x.shape[0], nc, nyi, nyj), dtype=x.dtype) n = np.zeros((nyi, nyj)) for i in range(si): for j in range(sj): xij = x[:, :, i:min(nxi2+i, nxi):sti, j:min(nxj2+j, nxj):stj] ni, nj = xij.shape[-2:] if kind == 'max': y[:, :, :ni, :nj] = np.maximum(y[:, :, :ni, :nj], xij) elif kind == 'avg': y[:, :, :ni, :nj] += xij n[:ni, :nj] += 1 else: raise NotImplementedError(kind) if kind == 'avg': y /= n return y.ravel() return step_pool2d
[docs]class PresentJitteredImages(Process): images = NdarrayParam('images', shape=('...',)) image_shape = ShapeParam('image_shape', length=3, low=1) output_shape = ShapeParam('output_shape', length=2, low=1) presentation_time = NumberParam('presentation_time', low=0, low_open=True) jitter_std = NumberParam('jitter_std', low=0, low_open=True, optional=True) jitter_tau = NumberParam('jitter_tau', low=0, low_open=True) def __init__(self, images, presentation_time, output_shape, jitter_std=None, jitter_tau=None, **kwargs): import scipy.ndimage.interpolation # ^ required for simulation, so check it here self.images = images self.presentation_time = presentation_time self.image_shape = images.shape[1:] self.output_shape = output_shape self.jitter_std = jitter_std self.jitter_tau = (presentation_time if jitter_tau is None else jitter_tau) nc = self.image_shape[0] nyi, nyj = self.output_shape super(PresentJitteredImages, self).__init__( default_size_in=0, default_size_out=nc*nyi*nyj, **kwargs) def make_step(self, shape_in, shape_out, dt, rng): import scipy.ndimage.interpolation nc, nxi, nxj = self.image_shape nyi, nyj = self.output_shape ni, nj = nxi - nyi, nxj - nyj nij = np.array([ni, nj]) assert shape_in == (0,) assert shape_out == (nc*nyi*nyj,) if self.jitter_std is None: si, sj = ni / 4., nj / 4. else: si = sj = self.jitter_std tau = self.jitter_tau n = len(self.images) images = self.images.reshape((n, nc, nxi, nxj)) presentation_time = float(self.presentation_time) cij = (nij - 1) / 2. dt7tau = dt / tau sigma2 = np.sqrt(2.*dt/tau) * np.array([si, sj]) ij = cij.copy() def step_presentjitteredimages(t): # update jitter position ij0 = dt7tau*(cij - ij) + sigma2*rng.normal(size=2) ij[:] = (ij + ij0).clip((0, 0), (ni, nj)) # select image k = int((t-dt) / presentation_time + 1e-7) image = images[k % n] # interpolate jittered sub-image i, j = ij image = scipy.ndimage.interpolation.shift( image, (0, ni-i, nj-j))[:, -nyi:, -nyj:] return image.ravel() return step_presentjitteredimages