import warnings
import numpy as np
import nengo.utils.numpy as npext
from nengo.base import Process
from nengo.dists import DistributionParam, Gaussian
from nengo.exceptions import ValidationError
from nengo.params import BoolParam, DictParam, EnumParam, NdarrayParam, NumberParam
from nengo.synapses import LinearFilter, Lowpass, SynapseParam
from nengo.utils.numpy import is_number
[docs]class WhiteNoise(Process):
"""Full-spectrum white noise process.
Parameters
----------
dist : Distribution, optional
The distribution from which to draw samples.
scale : bool, optional
Whether to scale the white noise for integration. Integrating white
noise requires using a time constant of ``sqrt(dt)`` instead of ``dt``
on the noise term [1]_, to ensure the magnitude of the integrated
noise does not change with ``dt``.
seed : int, optional
Random number seed. Ensures noise will be the same each run.
References
----------
.. [1] Gillespie, D.T. (1996) Exact numerical simulation of the Ornstein-
Uhlenbeck process and its integral. Phys. Rev. E 54, pp. 2084-91.
"""
dist = DistributionParam("dist")
scale = BoolParam("scale")
def __init__(self, dist=Gaussian(mean=0, std=1), scale=True, **kwargs):
super().__init__(default_size_in=0, **kwargs)
self.dist = dist
self.scale = scale
def __repr__(self):
return "%s(%r, scale=%r)" % (type(self).__name__, self.dist, self.scale)
[docs] def make_step(self, shape_in, shape_out, dt, rng, state):
assert shape_in == (0,)
assert len(shape_out) == 1
dist = self.dist
scale = self.scale
alpha = 1.0 / np.sqrt(dt)
# ^ need sqrt(dt) when integrating, so divide by sqrt(dt) here,
# since dt / sqrt(dt) = sqrt(dt).
def step_whitenoise(t):
x = dist.sample(n=1, d=shape_out[0], rng=rng)[0]
return alpha * x if scale else x
return step_whitenoise
[docs]class FilteredNoise(Process):
"""Filtered white noise process.
This process takes white noise and filters it using the provided synapse.
Parameters
----------
synapse : Synapse, optional
The synapse to use to filter the noise.
dist : Distribution, optional
The distribution used to generate the white noise.
scale : bool, optional
Whether to scale the white noise for integration, making the output
signal invariant to ``dt``.
seed : int, optional
Random number seed. Ensures noise will be the same each run.
"""
synapse = SynapseParam("synapse")
dist = DistributionParam("dist")
scale = BoolParam("scale")
def __init__(
self,
synapse=Lowpass(tau=0.005),
dist=Gaussian(mean=0, std=1),
scale=True,
**kwargs
):
super().__init__(default_size_in=0, **kwargs)
self.synapse = synapse
self.dist = dist
self.scale = scale
def __repr__(self):
return "%s(synapse=%r, dist=%r, scale=%r)" % (
type(self).__name__,
self.synapse,
self.dist,
self.scale,
)
[docs] def make_state(self, shape_in, shape_out, dt, dtype=None):
return self.synapse.make_state(shape_out, shape_out, dt, dtype=dtype)
[docs] def make_step(self, shape_in, shape_out, dt, rng, state):
assert shape_in == (0,)
assert len(shape_out) == 1
dist = self.dist
scale = self.scale
alpha = 1.0 / np.sqrt(dt)
filter_step = self.synapse.make_step(shape_out, shape_out, dt, rng, state)
def step_filterednoise(t):
x = dist.sample(n=1, d=shape_out[0], rng=rng)[0]
if scale:
x *= alpha
return filter_step(t, x)
return step_filterednoise
[docs]class BrownNoise(FilteredNoise):
"""Brown noise process (aka Brownian noise, red noise, Wiener process).
This process is the integral of white noise.
Parameters
----------
dist : Distribution, optional
The distribution used to generate the white noise.
seed : int, optional
Random number seed. Ensures noise will be the same each run.
"""
def __init__(self, dist=Gaussian(mean=0, std=1), **kwargs):
super().__init__(
synapse=LinearFilter([1], [1, 0], method="euler"), dist=dist, **kwargs
)
def __repr__(self):
return "%s(%r)" % (type(self).__name__, self.dist)
[docs]class WhiteSignal(Process):
"""An ideal low-pass filtered white noise process.
This signal is created in the frequency domain, and designed to have
exactly equal power at all frequencies below the cut-off frequency,
and no power above the cut-off.
The signal is naturally periodic, so it can be used beyond its period
while still being continuous with continuous derivatives.
Parameters
----------
period : float
A white noise signal with this period will be generated.
Samples will repeat after this duration.
high : float
The cut-off frequency of the low-pass filter, in Hz.
Must not exceed the Nyquist frequency for the simulation
timestep, which is ``0.5 / dt``.
rms : float, optional
The root mean square power of the filtered signal
y0 : float, optional
Align the phase of each output dimension to begin at the value
that is closest (in absolute value) to y0.
seed : int, optional
Random number seed. Ensures noise will be the same each run.
"""
period = NumberParam("period", low=0, low_open=True)
high = NumberParam("high", low=0, low_open=True)
rms = NumberParam("rms", low=0, low_open=True)
y0 = NumberParam("y0", optional=True)
def __init__(self, period, high, rms=0.5, y0=None, **kwargs):
super().__init__(default_size_in=0, **kwargs)
self.period = period
self.high = high
self.rms = rms
self.y0 = y0
if self.high is not None and self.high < 1.0 / self.period:
raise ValidationError(
"Make ``high >= 1. / period`` to produce a non-zero signal",
attr="high",
obj=self,
)
def __repr__(self):
return "%s(period=%r, high=%r, rms=%r)" % (
type(self).__name__,
self.period,
self.high,
self.rms,
)
[docs] def make_step(self, shape_in, shape_out, dt, rng, state):
assert shape_in == (0,)
nyquist_cutoff = 0.5 / dt
if self.high > nyquist_cutoff:
raise ValidationError(
"High must not exceed the Nyquist frequency "
"for the given dt (%0.3f)" % nyquist_cutoff,
attr="high",
obj=self,
)
n_coefficients = int(np.ceil(self.period / dt / 2.0))
shape = (n_coefficients + 1,) + shape_out
sigma = self.rms * np.sqrt(0.5)
coefficients = 1j * rng.normal(0.0, sigma, size=shape)
coefficients += rng.normal(0.0, sigma, size=shape)
coefficients[0] = 0.0
coefficients[-1].imag = 0.0
set_to_zero = npext.rfftfreq(2 * n_coefficients, d=dt) > self.high
coefficients[set_to_zero] = 0.0
power_correction = np.sqrt(
1.0 - np.sum(set_to_zero, dtype=float) / n_coefficients
)
if power_correction > 0.0:
coefficients /= power_correction
coefficients *= np.sqrt(2 * n_coefficients)
signal = np.fft.irfft(coefficients, axis=0)
if self.y0 is not None:
# Starts each dimension off where it is closest to y0
def shift(x):
offset = np.argmin(abs(self.y0 - x))
return np.roll(x, -offset + 1) # +1 since t starts at dt
signal = np.apply_along_axis(shift, 0, signal)
def step_whitesignal(t):
i = int(round(t / dt))
return signal[i % signal.shape[0]]
return step_whitesignal
[docs]class PiecewiseDataParam(DictParam):
"""Piecewise-specific validation for the data dictionary.
In the `.Piecewise` data dict, the keys are points in time (float) and
values are numerical constants or callables of the same dimensionality.
"""
def coerce(self, instance, data):
data = super().coerce(instance, data)
size_out = None
for time, value in data.items():
if not is_number(time):
raise ValidationError(
"Keys must be times (floats or ints), "
"not %r" % type(time).__name__,
attr="data",
obj=instance,
)
# figure out the length of this item
if callable(value):
try:
value = np.ravel(value(time))
except Exception:
raise ValidationError(
"callable object for time step %.3f "
"should return a numerical constant" % time,
attr="data",
obj=instance,
)
else:
value = np.ravel(value)
data[time] = value
size = value.size
# make sure this is the same size as previous items
if size != size_out and size_out is not None:
raise ValidationError(
"time %g has size %d instead of %d" % (time, size, size_out),
attr="data",
obj=instance,
)
size_out = size
return data
[docs]class Piecewise(Process):
"""A piecewise function with different options for interpolation.
Given an input dictionary of ``{0: 0, 0.5: -1, 0.75: 0.5, 1: 0}``,
this process will emit the numerical values (0, -1, 0.5, 0)
starting at the corresponding time points (0, 0.5, 0.75, 1).
The keys in the input dictionary must be times (float or int).
The values in the dictionary can be floats, lists of floats,
or numpy arrays. All lists or numpy arrays must be of the same length,
as the output shape of the process will be determined by the shape
of the values.
Interpolation on the data points using `scipy.interpolate` is also
supported. The default interpolation is 'zero', which creates a
piecewise function whose values change at the specified time points.
So the above example would be shortcut for:
.. testcode::
def function(t):
if t < 0.5:
return 0
elif t < 0.75:
return -1
elif t < 1:
return 0.5
else:
return 0
For times before the first specified time, an array of zeros (of
the correct length) will be emitted.
This means that the above can be simplified to:
.. testcode::
from nengo.processes import Piecewise
Piecewise({0.5: -1, 0.75: 0.5, 1: 0})
Parameters
----------
data : dict
A dictionary mapping times to the values that should be emitted
at those times. Times must be numbers (ints or floats), while values
can be numbers, lists of numbers, numpy arrays of numbers,
or callables that return any of those options.
interpolation : str, optional
One of 'linear', 'nearest', 'slinear', 'quadratic', 'cubic', or 'zero'.
Specifies how to interpolate between times with specified value.
'zero' creates a plain piecewise function whose values begin at
corresponding time points, while all other options interpolate
as described in `scipy.interpolate`.
Attributes
----------
data : dict
A dictionary mapping times to the values that should be emitted
at those times. Times are numbers (ints or floats), while values
can be numbers, lists of numbers, numpy arrays of numbers,
or callables that return any of those options.
interpolation : str
One of 'linear', 'nearest', 'slinear', 'quadratic', 'cubic', or 'zero'.
Specifies how to interpolate between times with specified value.
'zero' creates a plain piecewise function whose values change at
corresponding time points, while all other options interpolate
as described in `scipy.interpolate`.
Examples
--------
.. testcode::
from nengo.processes import Piecewise
process = Piecewise({0.5: 1, 0.75: -1, 1: 0})
with nengo.Network() as model:
u = nengo.Node(process, size_out=process.default_size_out)
up = nengo.Probe(u)
with nengo.Simulator(model, progress_bar=False) as sim:
sim.run(1.5)
f = sim.data[up]
t = sim.trange()
print(f[t == 0.2])
print(f[t == 0.58])
.. testoutput::
[[ 0.]]
[[ 1.]]
"""
data = PiecewiseDataParam("data", readonly=True)
interpolation = EnumParam(
"interpolation",
values=("zero", "linear", "nearest", "slinear", "quadratic", "cubic"),
)
def __init__(self, data, interpolation="zero", **kwargs):
self.data = data
needs_scipy = ("linear", "nearest", "slinear", "quadratic", "cubic")
if interpolation in needs_scipy:
self.sp_interpolate = None
if any(callable(val) for val in self.data.values()):
warnings.warn(
"%r interpolation cannot be applied because "
"a callable was supplied for some piece of the "
"function. Using 'zero' interpolation instead." % (interpolation,)
)
interpolation = "zero"
else:
try:
import scipy.interpolate # pylint: disable=import-outside-toplevel
self.sp_interpolate = scipy.interpolate
except ImportError:
warnings.warn(
"%r interpolation cannot be applied because "
"scipy is not installed. Using 'zero' "
"interpolation instead." % (interpolation,)
)
interpolation = "zero"
self.interpolation = interpolation
super().__init__(default_size_in=0, default_size_out=self.size_out, **kwargs)
@property
def size_out(self):
time, value = next(iter(self.data.items()))
value = np.ravel(value(time)) if callable(value) else value
return value.size
[docs] def make_step(self, shape_in, shape_out, dt, rng, state):
tp, yp = zip(*sorted(self.data.items()))
assert shape_in == (0,)
assert shape_out == (self.size_out,)
if self.interpolation == "zero":
def step_piecewise(t):
ti = (np.searchsorted(tp, t + 0.5 * dt) - 1).clip(-1, len(yp) - 1)
if ti == -1:
return np.zeros(shape_out)
else:
return np.ravel(yp[ti](t)) if callable(yp[ti]) else yp[ti]
else:
assert self.sp_interpolate is not None
if self.interpolation == "cubic" and 0 not in tp:
warnings.warn(
"'cubic' interpolation may fail if data not specified for t=0.0"
)
f = self.sp_interpolate.interp1d(
tp,
yp,
axis=0,
kind=self.interpolation,
bounds_error=False,
fill_value=0.0,
)
def step_piecewise(t):
return np.ravel(f(t))
return step_piecewise