import gzip
import io
import os
import re
import tarfile
import nengo
from nengo.utils.compat import is_integer, is_iterable
import numpy as np
from .compat import pickle_load_bytes, urlretrieve
data_dir = nengo.rc.get('nengo_extras', 'data_dir')
def get_file(filename, url):
filename = os.path.expanduser(filename)
if not os.path.exists(filename):
print("Retrieving %r" % url)
urlretrieve(url, filename=filename)
print("Data retrieved as %r" % filename)
return filename
[docs]def get_cifar10_tar_gz():
filename = os.path.join(data_dir, 'cifar-10-python.tar.gz')
url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
return get_file(filename, url)
[docs]def get_cifar100_tar_gz():
filename = os.path.join(data_dir, 'cifar-100-python.tar.gz')
url = 'https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz'
return get_file(filename, url)
[docs]def get_ilsvrc2012_tar_gz():
filename = os.path.join(data_dir, 'ilsvrc-2012-batches-test3.tar.gz')
url = 'http://files.figshare.com/5370887/ilsvrc-2012-batches-test3.tar.gz'
return get_file(filename, url)
[docs]def get_mnist_pkl_gz():
filename = os.path.join(data_dir, 'mnist.pkl.gz')
url = 'http://deeplearning.net/data/mnist/mnist.pkl.gz'
return get_file(filename, url)
[docs]def get_svhn_tar_gz():
filename = os.path.join(data_dir, 'svhn-py-colmajor.tar.gz')
url = 'https://files.figshare.com/7868377/svhn-py-colmajor.tar.gz'
return get_file(filename, url)
def unpickle_tarfile(tar, name):
tarextract = tar.extractfile(name)
return pickle_load_bytes(tarextract)
[docs]def load_cifar10(filepath=None, n_train=5, n_test=1, label_names=False):
"""Load the CIFAR-10 dataset.
Parameters
----------
filepath : str (optional, Default: None)
Path to the previously downloaded 'cifar-10-python.tar.gz' file.
If ``None``, the file will be downloaded to the current directory.
n_train : int (optional, Default: 6)
The number of training batches to load (max: 6).
n_test : int (optional, Default: 6)
The number of testing batches to load (max: 1).
label_names : boolean (optional, Default: False)
Whether to provide the category label names.
Returns
-------
train_set : (n_train, n_pixels) ndarray, (n_train,) ndarray
A tuple of the training image array and label array.
test_set : (n_test, n_pixels) ndarray, (n_test,) ndarray
A tuple of the testing image array and label array.
label_names : list
A list of the label names.
"""
if filepath is None:
filepath = get_cifar10_tar_gz()
# helper for reading each batch file
def read_tar_batch(tar, name):
data = unpickle_tarfile(tar, name)
return data[b'data'], np.array(data[b'labels'])
filepath = os.path.expanduser(filepath)
with tarfile.open(filepath, 'r:gz') as tar:
if n_train < 1:
train = (np.array([]), np.array([]))
else:
train = ([], [])
for i in range(n_train):
data, labels = read_tar_batch(
tar, 'cifar-10-batches-py/data_batch_%d' % (i+1))
train[0].append(data)
train[1].append(labels)
train = (np.vstack(train[0]), np.hstack(train[1]))
if n_test < 1:
test = (np.array([]), np.array([]))
else:
test = read_tar_batch(tar, 'cifar-10-batches-py/test_batch')
if label_names:
meta = unpickle_tarfile(tar, 'cifar-10-batches-py/batches.meta')
names = meta[b'label_names']
return (train, test) + ((names,) if label_names else ())
[docs]def load_cifar100(filepath=None, fine_labels=True, label_names=False):
"""Load the CIFAR-100 dataset.
Parameters
----------
filepath : str (optional, Default: None)
Path to the previously downloaded 'cifar-100-python.tar.gz' file.
If ``None``, the file will be downloaded to the current directory.
fine_labels : boolean (optional, Default: True)
Whether to provide the fine labels or coarse labels.
label_names : boolean (optional, Default: False)
Whether to provide the category label names.
Returns
-------
train_set : (n_train, n_pixels) ndarray, (n_train,) ndarray
A tuple of the training image array and label array.
test_set : (n_test, n_pixels) ndarray, (n_test,) ndarray
A tuple of the testing image array and label array.
label_names : list
A list of the label names.
"""
if filepath is None:
filepath = get_cifar100_tar_gz()
# helper for reading each batch file
def read_tar_batch(tar, name):
data = unpickle_tarfile(tar, name)
return data[b'data'], np.array(
data[b'fine_labels' if fine_labels else b'coarse_labels'])
filepath = os.path.expanduser(filepath)
with tarfile.open(filepath, 'r:gz') as tar:
train = read_tar_batch(tar, 'cifar-100-python/train')
test = read_tar_batch(tar, 'cifar-100-python/test')
if label_names:
meta = unpickle_tarfile(tar, 'cifar-100-python/meta')
names = meta[
b'fine_label_names' if fine_labels else b'coarse_label_names']
return (train, test) + ((names,) if label_names else ())
[docs]def load_ilsvrc2012(filepath=None, n_files=None):
"""Load part of the ILSVRC 2012 (ImageNet) dataset.
This loads a small section of the ImageNet Large Scale Visual Recognition
Challenge (ILSVRC) 2012 dataset. The images are from the test portion of
the dataset, and can be used to test pretrained classifiers.
Parameters
----------
filepath : str (optional, Default: None)
Path to the previously downloaded 'ilsvrc-2012-batches-test3.tar.gz'.
If ``None``, the file will be downloaded to the current directory.
n_files : int (optional, Default: None)
Number of files (batches) to load from the archive. Defaults to all.
Returns
-------
images : (n_images, nc, ny, nx) ndarray
The loaded images. nc = number of channels, ny = height, nx = width
labels : (n_images,) ndarray
The labels of the images.
data_mean : (nc, ny, nx) ndarray
The mean of the images in the whole of the training set.
label_names : list
A list of the label names.
"""
import PIL.Image # ``pip install pillow``
if filepath is None:
filepath = get_ilsvrc2012_tar_gz()
# helper for reading each batch file
def read_tar_batch(tar, name):
data = unpickle_tarfile(tar, name)
return data[b'data'], data[b'labels'] # JPEG strings, labels
def bytes_to_array(b):
image = PIL.Image.open(io.BytesIO(b))
array = np.array(image, dtype=np.uint8).reshape(
image.size[0], image.size[1], 3)
array = np.transpose(array, (2, 0, 1))
return array
filepath = os.path.expanduser(filepath)
with tarfile.open(filepath, 'r:gz') as tar:
names = tar.getnames()
regex = re.compile(r'.*/data_batch_([0-9]+\.[0-9]+)')
matches = [regex.match(name) for name in names]
matches = [match for match in matches if match]
batchfiles = {}
for match in matches:
batchfiles[float(match.groups()[-1])] = match.group()
raw_images = []
raw_labels = []
for key in sorted(list(batchfiles))[:n_files]:
batchfile = batchfiles[key]
x, y = read_tar_batch(tar, batchfile)
raw_images.extend(x)
raw_labels.extend(y)
n_images = len(raw_images)
image_shape = bytes_to_array(raw_images[0]).shape
images = np.zeros((n_images,) + image_shape, dtype=np.uint8)
for i, s in enumerate(raw_images):
images[i] = bytes_to_array(s)
labels = np.array(raw_labels)
labels.shape = (n_images,)
meta = unpickle_tarfile(tar, 'batches.meta')
data_mean = meta[b'data_mean'].reshape(image_shape)
label_names = meta[b'label_names']
return images, labels, data_mean, label_names
def load_ilsvrc2012_metadata(filepath=None):
if filepath is None:
filepath = get_ilsvrc2012_tar_gz()
filepath = os.path.expanduser(filepath)
with tarfile.open(filepath, 'r:gz') as tar:
meta = unpickle_tarfile(tar, 'batches.meta')
data_mean = meta[b'data_mean'].reshape((3, 256, 256))
label_names = meta[b'label_names']
return data_mean, label_names
[docs]def load_mnist(filepath=None, validation=False):
"""Load the MNIST dataset.
Parameters
----------
filepath : str (optional, Default: None)
Path to the previously downloaded 'mnist.pkl.gz' file.
If ``None``, the file will be downloaded to the current directory.
validation : boolean (optional, Default: False)
Whether to provide the validation data as a separate set (True),
or combine it into the training data (False).
Returns
-------
train_set : (n_train, n_pixels) ndarray, (n_train,) ndarray
A tuple of the training image array and label array.
validation_set : (n_valid, n_pixels) ndarray, (n_valid,) ndarray
A tuple of the validation image array and label array
(if ``validation``)
test_set : (n_test, n_pixels) ndarray, (n_test,) ndarray
A tuple of the testing image array and label array.
"""
if filepath is None:
filepath = get_mnist_pkl_gz()
filepath = os.path.expanduser(filepath)
with gzip.open(filepath, 'rb') as f:
train_set, valid_set, test_set = pickle_load_bytes(f)
if validation:
return train_set, valid_set, test_set
else: # combine valid into train
train_set = (np.vstack((train_set[0], valid_set[0])),
np.hstack((train_set[1], valid_set[1])))
return train_set, test_set
[docs]def load_svhn(filepath=None, n_train=9, n_test=3, data_mean=False,
label_names=False):
"""Load the SVHN dataset.
Parameters
----------
filepath : str (optional, Default: None)
Path to the previously downloaded 'svhn-py-colmajor.tar.gz' file.
If ``None``, the file will be downloaded to the current directory.
n_train : int (optional, Default: 6)
The number of training batches to load (max: 6).
n_test : int (optional, Default: 6)
The number of testing batches to load (max: 1).
label_names : boolean (optional, Default: False)
Whether to provide the category label names.
Returns
-------
train_set : (n_train, n_pixels) ndarray, (n_train,) ndarray
A tuple of the training image array and label array.
test_set : (n_test, n_pixels) ndarray, (n_test,) ndarray
A tuple of the testing image array and label array.
label_names : list
A list of the label names.
"""
shape = (3, 32, 32)
if filepath is None:
filepath = get_svhn_tar_gz()
def read_tar_batch(tar, name):
data = unpickle_tarfile(tar, name)
return data[b'data'], np.array(data[b'labels'])
def load_batches(tar, inds):
if len(inds) < 1:
return (np.array([]), np.array([]))
batches = ([], [])
for i in inds:
data, labels = read_tar_batch(
tar, 'svhn-py-colmajor/data_batch_%d' % i)
batches[0].append(data.T)
batches[1].append(labels)
return (np.vstack(batches[0]).reshape((-1,) + shape),
np.hstack(batches[1]))
filepath = os.path.expanduser(filepath)
with tarfile.open(filepath, 'r:gz') as tar:
train = load_batches(tar, list(range(1, n_train+1)))
test = load_batches(tar, list(range(10, n_test+10)))
if label_names or data_mean:
meta = unpickle_tarfile(tar, 'svhn-py-colmajor/batches.meta')
data_mean = (meta[b'data_mean'].reshape(shape),) if data_mean else ()
label_names = (meta[b'label_names'],) if label_names else ()
return (train, test) + data_mean + label_names
[docs]def spasafe_name(name, pre_comma_only=True):
"""Make a name safe to use as a SPA semantic pointer name.
Ensure a name conforms with SPA name standards. Replaces hyphens and
spaces with underscores, removes all other characters, and makes the
first letter uppercase.
Parameters
----------
pre_comma_only : boolean
Only use the part of a name before a/the first comma.
"""
if len(name) == 0:
raise ValueError("Empty name.")
if pre_comma_only and ',' in name:
name = name.split(',')[0] # part before first comma
name = name.strip()
name = re.sub(r'(\s|-|,)+', '_', name) # repl space/hyphen/comma w undersc
name = re.sub('(^[^a-zA-Z]+)|[^a-zA-Z0-9_]+', '', name) # del other chars
name = name[0].upper() + name[1:] # capitalize first letter
return name
[docs]def spasafe_names(label_names, pre_comma_only=True):
"""Make names safe to use as SPA semantic pointer names.
Format a list of names to conform with SPA name standards. In addition
to running each name through ``spasafe_name``, this function numbers
duplicate names so they are unique.
Parameters
----------
pre_comma_only : boolean
Only use the part of a name before a/the first comma.
"""
vocab_names = [spasafe_name(name, pre_comma_only=pre_comma_only)
for name in label_names]
# number duplicates
unique = set()
duplicates = []
for name in vocab_names:
if name in unique:
duplicates.append(name)
else:
unique.add(name)
duplicates = {name: 0 for name in duplicates}
for i, name in enumerate(vocab_names):
if name in duplicates:
vocab_names[i] = '%s%d' % (name, duplicates[name])
duplicates[name] += 1
return vocab_names
[docs]def one_hot_from_labels(labels, classes=None, dtype=float):
"""Turn integer labels into a one-hot encoding.
Parameters
==========
labels : (n,) array
Labels to turn into one-hot encoding.
classes : int or (n_classes,) array (optional)
Classes for encoding. If integer and ``labels.dtype`` is integer, this
is the number of classes in the encoding. If iterable, this is the
list of classes to place in the one-hot (must be a superset of the
unique elements in ``labels``).
dtype : dtype (optional)
Data type of returned one-hot encoding (defaults to ``float``).
"""
assert labels.ndim == 1
n = labels.shape[0]
if np.issubdtype(labels.dtype, np.integer) and (
classes is None or is_integer(classes)):
index = labels
index_min, index_max = index.min(), index.max()
n_classes = (index_max + 1) if classes is None else classes
assert index_min >= 0
assert index_max < n_classes
else:
if classes is not None:
assert is_iterable(classes)
assert set(np.unique(labels)).issubset(classes)
classes = np.unique(labels) if classes is None else classes
n_classes = len(classes)
c_index = np.argsort(classes)
c_sorted = classes[c_index]
index = c_index[np.searchsorted(c_sorted, labels)]
y = np.zeros((n, n_classes), dtype=dtype)
y[np.arange(n), index] = 1
return y
[docs]class ZCAWhiten(object):
"""ZCA Whitening
References
----------
.. [1] Krizhevsky, Alex. "Learning multiple layers of features from tiny
images" (2009) MSc Thesis, Dept. of Comp. Science, Univ. of
Toronto. pp. 48-49.
"""
def __init__(self, beta=1e-2, gamma=1e-5):
self.beta = beta
self.gamma = gamma
self.dims = None
self.pixel_mu = None
self.e = None
self.V = None
self.Sinv = None
def contrast_normalize(self, X, remove_mean=True, beta=None,
hard_beta=True):
X = np.asarray(X, dtype=np.float64)
if X.ndim != 2:
raise ValueError('contrast_normalize requires flat patches')
Xc = X - X.mean(axis=1)[:, None] if remove_mean else X
l2 = (Xc * Xc).sum(axis=1)
beta = self.beta if beta is None else beta
div2 = np.maximum(l2, beta) if hard_beta else l2 + beta
return Xc / np.sqrt(div2[:, None])
[docs] def fit(self, X):
"""Fit whitening transform to training data
Parameters
----------
X : array_like
Flattened data, with each row corresponding to one example
"""
X = self.contrast_normalize(X)
self.dims = X.shape[1]
self.pixel_mu = X.mean(axis=0)
X -= self.pixel_mu[None, :] # each pixel has zero mean
S = np.dot(X.T, X) / (X.shape[0] - 1)
e, V = np.linalg.eigh(S)
self.e = e
self.V = V
self.Sinv = np.dot(np.sqrt(1.0 / (e + self.gamma)) * V, V.T)
return np.dot(X, self.Sinv)
def transform(self, X):
assert self.dims is not None
X = self.contrast_normalize(X, beta=self.beta)
assert X.shape[1] == self.dims
X -= self.pixel_mu[None, :]
return np.dot(X, self.Sinv)