Source code for lightonml.random_projections.opu

# -*- coding: utf8
"""This module is the heart of the library, the interface to the OPU lives here.
Here are the essential functions to talk to the OPU and run computations on it.
"""
from contextlib import ExitStack
import time
import sys

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

from ..encoding.utils import get_formatting_function
from lightonopu.opu import OPU
# Progress bars in transform. Detect whether we're in a notebook,
# in this case load it in notebook mode
from .. import utils

try:
    from tqdm import tqdm
except ImportError:
    pass

[docs]class OPURandomMapping(BaseEstimator, TransformerMixin): """Interface to the OPU. .. math:: \\mathbf{y} = \\lvert \\mathbf{R} \\mathbf{x} \\rvert^2 Parameters ---------- opu : lightonopu.opu.OPU, optical processing unit instance linked to a physical device. n_components : int, dimensionality of the target projection space. position: string or callable, type of formatting on the DMD. Built-in options are ['centered', '2d_macro_pixels', '1d_square_macro_pixels', '1d_rectangle_macro_pixels', 'lined']. roi_shape: tuple of ints, defaults to (-1, -1), shape of the region of interest on the DMD. roi_position: tuple of ints, position of the region of interest on the DMD. The `y` axis is oriented downwards. disable_pbar: boolean, if True, disable the progress bar of the transform. Attributes ---------- opu : lightonopu.opu.OPU, optical processing unit instance linked to a physical device. n_components : int, dimensionality of the target projection space. position: str, 'centered' ,'1d_macro-pixels', '2d_macro_pixels' or callable. function used to format the input for the DMD. roi_shape: tuple of ints, shape of the region of interest (ROI) on the DMD. roi_position: tuple of ints, position of the upper left corner of the ROI on the DMD. The `y` axis is oriented downwards. disable_pbar: boolean, if True, disable the progress bar of the transform. factor: int, number of times a single pixel is repeated. formatting_func: callable, function that builds the macro-pixels in the DMD array format. n_features: int, number of features of the samples """ def __init__(self, opu, n_components=1000, position='2d_macro_pixels', roi_shape=(-1, -1), roi_position=(0, 0), disable_pbar=False): if not isinstance(opu, OPU): raise Exception('Please pass a valid object lightonopu.opu.OPU to the constructor') self.n_components = n_components self.opu = opu self.position = position if roi_shape == (-1, -1): self.roi_shape = tuple(opu.dmd_shape) else: self.roi_shape = roi_shape self.roi_position = roi_position self.disable_pbar = disable_pbar self._dmd_shape = tuple(opu.dmd_shape) self.projection_times = list() self.factor = None self.formatting_func = None self.n_features = None if self.roi_shape[0] + self.roi_position[0] > self._dmd_shape[0]: raise ValueError('Target display area is bigger than DMD area.' + 'Dim 0: {} > {}'.format(self.roi_shape[0] + self.roi_position[0], self._dmd_shape[0])) elif self.roi_shape[1] + self.roi_position[1] > self._dmd_shape[1]: raise ValueError('Target display area is bigger than DMD area.' + 'Dim 1: {} > {}'.format(self.roi_shape[1] + self.roi_position[1], self._dmd_shape)[1])
[docs] def opu_mechanics_in(self, X): """Packs bytes to bits to send to the OPU. Parameters ---------- X : 2D np.ndarray, input data in bytes. Returns ------- X_packed : 2D np.ndarray input data in bits. """ X_packed = np.packbits(X, axis=1) return X_packed
[docs] def opu_mechanics_out(self, X): """Selects n_components from the OPU output and casts to float32. Parameters ---------- X : 2D np.ndarray, input data in bytes. Returns ------- X_out : 2D np.ndarray random features. """ X_out = X[:, :self.n_components].astype(np.float32) return X_out
[docs] def check(self, X): """Safety checks for DMD input. Parameters ---------- X : 2D np.ndarray, input data in bytes. """ n_samples, n_features = X.shape dmd_nb_features = self.opu.dmd_shape[0] * self.opu.dmd_shape[1] if not isinstance(X, np.ndarray): raise TypeError('The input has to be a numpy.ndarray.') if not (X.dtype.name == 'uint8'): raise TypeError('The input has to be an array of numpy.uint8.') if not n_features == dmd_nb_features: raise ValueError('The dimensionality of the vector has to be equal to {:d}.'.format(dmd_nb_features))
[docs] def fit(self, X, y=None): """Initialize the formatting method. Parameters ---------- X: 2D or 3D np.ndarray, input data. y: np.ndarray or None, default to None, targets. Returns ------- self: OPURandomMapping. """ self.n_features = X.shape[-1] self.build_formatting_func() return self
def build_formatting_func(self): if isinstance(self.position, str): self.formatting_func, self.factor = get_formatting_function(self.n_features, self.position, self.roi_shape, self.roi_position, self._dmd_shape) elif callable(self.position): self.formatting_func = self.position else: raise Exception("Attribute position should be either a string or a callable")
[docs] def transform_(self, X, y=None): """Performs the nonlinear random projections. Parameters ---------- X: 2D np.ndarray, input data. y: np.ndarray or None, targets. Returns ------- random_features: 2D np.ndarray, non linear random features. """ X_format = self.formatting_func(X) self.check(X_format) X_opu = self.opu_mechanics_in(X_format) inc_opu = 0 while True: try: Y = self.opu.transform(X_opu) break except: inc_opu += 1 if inc_opu == 5: print('input shape: {:}'.format(X_opu.shape)) print('input type: {:}'.format(X_opu.dtype)) print('input max: {:}'.format(np.max(X_opu))) print('input min: {:}'.format(np.min(X_opu))) raise random_features = self.opu_mechanics_out(Y) return random_features
[docs] def transform(self, X, y=None, n_samples_by_pass=3000): """Performs the nonlinear random projections batch by batch. Parameters ---------- X: 2D or 3D np.ndarray, input data. y: np.ndarray or None, targets. n_samples_by_pass: int, number of samples passed at each iteration to the OPU. Returns ------- Y: 2D or 3D np.ndarray, complete array of nonlinear random projections. """ if not ((X == 0) | (X == 1)).all(): raise ValueError('The input array should be binary - contain only 0s and 1s.') # handle 3D arrays by temporary reshaping to 2D n_dims_x = len(X.shape) if n_dims_x == 3: output_dims = X.shape[0], X.shape[1], self.n_components X = X.reshape(-1, X.shape[-1]) n_input_samples = X.shape[0] # if there is a batch smaller than 101 samples, pad the sequence with samples of zeros min_batch_size = 101 if 0 < n_input_samples % n_samples_by_pass < min_batch_size: X = np.pad(X, ((0, min_batch_size - n_input_samples % n_samples_by_pass), (0, 0)), 'constant', constant_values=0) # Check if the OPU is active already, otherwise get it to do the transform operation. # Anything that is entered in stack is automatically exited at the end of the `with` # statement. If nothing is entered, nothing is done. # As per https://stackoverflow.com/a/34798330/4892874 with ExitStack() as stack: if not self.opu.active: stack.enter_context(self.opu) print('OPU: random projections of an array of size ({},{})'.format(*X.shape)) t0 = time.time() n_samples = X.shape[0] # allocation of empty vector for iteration Y = np.empty((n_samples, self.n_components), dtype=np.uint8) # Divide indices in batches of n_samples_by_pass q = (n_samples // n_samples_by_pass) r = (n_samples % n_samples_by_pass) indices = np.arange(q + 1) * n_samples_by_pass if not r == 0: indices = np.append(indices, n_samples) # tqdm handles the progress bar if 'tqdm' in sys.modules: pbar = tqdm(total=n_samples, desc="OPU transform", disable=self.disable_pbar) else: pbar = None for ii in range(len(indices) - 1): start = indices[ii] end = indices[ii + 1] Y[start:end] = self.transform_(X[start:end]) if pbar is not None: # TODO use "if pbar:" when tqdm issue #574 ir fixed pbar.update(end - start) if pbar is not None: pbar.close() self.projection_times.append(time.time() - t0) if n_input_samples != n_samples: Y = Y[:n_input_samples] # if input was 3D, reshape to original dimensions if n_dims_x == 3: Y = Y.reshape(output_dims) return Y
def __getstate__(self): output_dict = self.__dict__.copy() if hasattr(self, 'formatting_func'): output_dict.pop('formatting_func') return output_dict def __setstate__(self, state): self.__dict__ = state self.build_formatting_func()