Commit 1d964d61 authored by hazrmard's avatar hazrmard
Browse files

added Recurrent integrator models in python

parent 94f08eb8
%% Cell type:code id: tags:
``` python
%reload_ext autoreload
%autoreload 2
import numpy as np
from numpy.random import randn
import torch
import torch.nn as nn
import torch.nn.functional as F
from models import TorchEstimator, Accumulator, AffineIntegrator, System
from state_space import SS
```
%% Cell type:code id: tags:
``` python
xrand = torch.rand(100, 20, 1) - 0.5
xneg = torch.rand(100, 20, 1) - 1
xpos = torch.rand(100, 20, 1)
yrand = np.cumsum(xrand, axis=0)
yneg = np.cumsum(xneg, axis=0)
ypos = np.cumsum(xpos, axis=0)
```
%% Cell type:markdown id: tags:
## Anatomy of a dynamic model
![anatomy](../docs/img/dynamic_system_anatomy.png)
A dynamic system consistes of 3 parts:
1. **State dynamics**: The system determines the rate of change of its internal state in response to its current state and outside stimulus. This can be an instantaneous/feed-forward mapping from inputs to rate of change.
2. **State**: The rates of change of state are accumulated to detemine the current state. The system has *memory*.
3. **Output**: The system determines the output from its current state. It can be a feed-forward mapping from state to output.
%% Cell type:code id: tags:
``` python
m = Accumulator()
reg = TorchEstimator(module=m, optimizer=None, epochs=500,
loss=None, batch_size=None, verbose=False, cuda=False)
# reg.fit(xrand, yrand) # <= Accumulator has not training parameters
print(reg.score(xrand, yrand))
print(reg.predict(torch.ones(10,1,1)).squeeze())
```
%%%% Output: stream
tensor(0.) tensor(9644.0918)
1.0
tensor([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.])
%% Cell type:code id: tags:
``` python
m = AffineIntegrator(fixed=False)
reg = TorchEstimator(module=m, optimizer=torch.optim.Adam(m.parameters(), lr=0.01), epochs=100,
loss=nn.MSELoss(), batch_size=10, verbose=False, cuda=False)
reg.fit(xrand, yrand)
print(reg.score(xrand, yrand))
print(reg.predict(torch.ones(10,1,1)).squeeze())
```
%%%% Output: display_data
%%%% Output: stream
-368458.125
tensor([ 0.6547, 1.6504, 2.7213, 3.8730, 5.1116, 6.4437, 7.8763, 9.4171,
11.0741, 12.8562])
%% Cell type:code id: tags:
``` python
m = System(fixed=False)
reg = TorchEstimator(module=m, optimizer=torch.optim.Adam(m.parameters(), lr=0.01), epochs=100,
loss=nn.MSELoss(), batch_size=10, verbose=False, cuda=False)
reg.fit(xrand, yrand)
print(reg.score(xrand, yrand))
print(reg.predict(torch.ones(10,1,1)).squeeze())
```
%%%% Output: display_data
%%%% Output: stream
-182146056192.0
tensor([ 0.7766, 1.6182, 2.5302, 3.5186, 4.5896, 5.7502, 7.0079, 8.3708,
9.8477, 11.4483])
%% Cell type:code id: tags:
``` python
k = 1 # spring constant
a = 0.5 # drag coefficient
m = 1 # mass
A = np.asarray([[0, 1], [-k/m, -a/m]])
B = np.asarray([[0],[1/m]])
C = np.asarray([[1, 0]])
D = np.asarray([[0]])
model = SS(A,B,C,D,tstep=1e-3)
U = randn(1000, 1)
Y, _ = model.run(U, 0.5)
print(f'Input shape:{U.shape}, output shape: {Y.shape}')
```
%%%% Output: stream
Input shape:(1000, 1), output shape: (1000, 1)
%% Cell type:code id: tags:
``` python
seqY = torch.from_numpy(Y).view(-1, 1, 1).float()
seqU = torch.from_numpy(U).view(-1, 1, 1).float()
m = AffineIntegrator(input_size=1, fixed=False)
reg = TorchEstimator(module=m, optimizer=torch.optim.Adam(m.parameters(), lr=0.01), epochs=20,
loss=nn.MSELoss(), batch_size=1, verbose=False, cuda=False)
reg.fit(seqU, seqY)
reg.score(seqU, seqY)
```
%%%% Output: display_data
%%%% Output: execute_result
-3.7041192054748535
%% Cell type:code id: tags:
``` python
seqY = torch.from_numpy(Y).view(-1, 1, 1).float()
seqU = torch.from_numpy(U).view(-1, 1, 1).float()
AB = nn.Linear(3, 2)
CD = nn.Linear(3, 1)
m = System(dx=AB, y=CD, input_size=1, state_size=2, output_size=1)
reg = TorchEstimator(module=m, optimizer=torch.optim.Adam(m.parameters(), lr=0.01), epochs=20,
loss=nn.MSELoss(), batch_size=1, verbose=False, cuda=False)
reg.fit(seqU, seqY)
reg.score(seqU, seqY)
```
%%%% Output: display_data
%%%% Output: execute_result
-3.7753424644470215
%% Cell type:code id: tags:
``` python
mfc = nn.Sequential(nn.Linear(1,3), nn.Tanh(), # 6
nn.Linear(3,3), nn.Tanh(), # 12
nn.Linear(3,4), nn.Tanh(), # 16
nn.Linear(4,1)) # 5
reg = TorchEstimator(module=mfc, optimizer=torch.optim.Adam(mfc.parameters(), lr=0.01), epochs=20,
loss=nn.MSELoss(), batch_size=10, verbose=False, cuda=False)
reg.fit(torch.from_numpy(U).float(), torch.from_numpy(Y).float())
reg.score(torch.from_numpy(U).float(), torch.from_numpy(Y).float())
```
%%%% Output: display_data
%%%% Output: execute_result
-3.7677178382873535
"""
Utility definitions for Models.ipynb
"""
from multiprocessing import Pool
from os import cpu_count
from typing import Iterable, Tuple, Iterator
import numpy as np
from sklearn import clone
from sklearn.neural_network import MLPRegressor
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm.auto import trange
def _fit(*args):
"""
Multi-processing payload function used by `fit_composite_model()`.
"""
est, (x, y) = args
return est.fit(x, y)
def fit_composite_model(estimator: MLPRegressor,
data: Iterable[Tuple[np.ndarray, np.ndarray]]) -> Iterable[MLPRegressor]:
"""
Fits copies of an estimator to different datasets in parallel.
Args:
* `estimator`: An estimator instance with a `fit(X, y)` method which returns
the instance.
* `data`: An iterable of tuples of arrays: [(train1, train2,..), (test1, test2,..)].
Returns:
* A list of fitted estimators.
"""
data = list(data)
estimators = [clone(estimator) for _ in data]
with Pool(min(len(data), cpu_count())) as pool:
return pool.starmap(_fit, zip(estimators, data))
class TorchEstimator:
"""
Wraps a `torch.nn.Module` instance with a scikit-learn `Estimator` API.
Args:
* `module`: A `nn.Module` describing the neural network,
* `optimizer`: An `Optimizer` instance which iteratively modifies weights,
* `loss`: a `_Loss` instance which calculates the loss metric,
* `epochs`: The number of times to iterate over the training data,
* `verbose`: Whether to log training progress or not,
* `batch_size`: Chunk size of data for each training step,
* `cuda`: Whether to use GPU acceleration if available.
"""
def __init__(self, module: nn.Module, optimizer: optim.Optimizer,
loss: nn.modules.loss._Loss, epochs: int=10, verbose=True,
batch_size: int=8, cuda=True):
self.module = module
self.optimizer = optimizer
self.loss = loss
self.epochs = epochs
self.verbose = verbose
self.batch_size = batch_size
self.batch_first = self._is_batch_first()
# pylint: disable=no-member
self._device = torch.device('cuda') if torch.cuda.is_available() and cuda\
else torch.device('cpu')
def fit(self, X: torch.Tensor, y: torch.Tensor) -> 'TorchEstimator':
"""
Fit target to features.
Args:
* `X`: `Tensor` of shape (SeqLen, N, Features) or (N, SeqLen, Features)
for recurrent modules or (N, Features) for other modules.
* `y`: `Tensor` of shape ([SeqLen,] N, OutputFeatures) for recurrent
modules of (N, OutputFeatures).
Returns:
* self
"""
if self.verbose:
print()
ranger = trange(self.epochs)
self.module.to(self._device)
for e in ranger:
total_loss = 0.
for instance, target in zip(self._to_batches(X), self._to_batches(y)):
instance, target = instance.to(self._device), target.to(self._device)
self.module.zero_grad()
output = self.module(instance)
loss = self.loss(output, target)
loss.backward()
self.optimizer.step()
total_loss += loss.item()
if self.verbose:
ranger.write(f'Epoch {e+1:3d}\tLoss: {total_loss:10.2f}')
return self
def predict(self, X: torch.Tensor) -> torch.Tensor:
"""
Predict output from inputs.
Args:
* `X`: `Tensor` of shape (SeqLen, N, Features) or (N, SeqLen, Features)
for recurrent modules or (N, Features) for other modules.
Returns:
* `Tensor` of shape ([SeqLen,] N, OutputFeatures) for recurrent
modules of (N, OutputFeatures).
"""
with torch.no_grad():
result = self.module(X)
return result
def score(self, X, y_true) -> float:
"""
Measure how well the estimator learned through the coefficient of
determination.
Args:
* `X`: `Tensor` of shape (SeqLen, N, Features) or (N, SeqLen, Features)
for recurrent modules or (N, Features) for other modules.
* `y_true`: True target values for each of N instances
Returns:
* float: Coefficient of determination.
"""
y_pred = self.predict(X)
residual_squares_sum = ((y_true - y_pred) ** 2).sum()
total_squares_sum = ((y_true - y_true.mean()) ** 2).sum()
return (1 - residual_squares_sum / total_squares_sum).item()
def _to_batches(self, X: torch.Tensor) -> Iterator[torch.Tensor]:
"""
Convert ([SeqLen,] N, Features) to a generator of ([SeqLen,] n, Features)
mini-batches. So for recurrent layers, training can be done in batches.
"""
if not self.batch_first:
# Recurrent layers take inputs of the shape (SeqLen, N, Features...)
# So if there is any recurrent layer in the module, assume that this
# is the expected input shape
N = X[0].size()[0]
nbatches = N // self.batch_size + (1 if N % self.batch_size else 0)
for i in range(nbatches):
yield X[:, i*self.batch_size:(i+1)*self.batch_size]
else:
# Fully connected layers take inputs of the shape (N, Features...)
N = len(X)
nbatches = N // self.batch_size + (1 if N % self.batch_size else 0)
for i in range(nbatches):
yield X[i*self.batch_size:(i+1)*self.batch_size]
def _is_recurrent(self) -> bool:
return any(map(lambda x: isinstance(x, nn.RNNBase), self.module.modules()))
def _is_batch_first(self) -> bool:
# Default setting is batch_first=False for RNNBase subclasses
return any(map(lambda x: getattr(x, 'batch_first', True), self.module.modules()))
class _accumulator(torch.autograd.Function):
"""
Accumulates sums over a sequence. Takes inputs as tensors
of the shape `(SeqLen, N, Features)`. Output is of same shape.
"""
@staticmethod
# pylint: disable=not-callable
def forward(ctx, x, h0=torch.tensor(0)):
# TODO: Use native pytorch functions to sum sequence.
return h0 + np.cumsum(x.data, axis=0)
@staticmethod
def backward(ctx, grad_output):
# pylint: disable=no-member
return grad_output, torch.zeros(*grad_output.size()[1:], dtype=grad_output.dtype)
# Aliasing static class method for convenience
accumulator = _accumulator.apply
class Accumulator(nn.Module):
"""
A module form of the `accumulator` function. Sums a sequence.
Args:
* `batch_first`: If False, inputs/outputs are of shape (`Time, Batch,
Feature`) - default. If True, the shape is (`Batch, Time, Feature`).
"""
def __init__(self, batch_first: bool=False):
super().__init__()
self.batch_first = batch_first
# pylint: disable=not-callable
def forward(self, x: torch.Tensor, h0=torch.tensor(0)) -> torch.Tensor:
"""
Args:
* `u`: `torch.Tensor` of shape `(Time, Batch, Feature)` if `batch_first=
False` else of shape `(Batch, Time, Feature)`.
* `h0`: `torch.Tensor` of shape `(Batch, Feature)` containing the initial
state. Defaults to 0.
Returns:
* `torch.Tensor` of shape `(Time, Batch, Feature)` if `batch_first=
False` else of shape `(Batch, Time, Feature)`.
"""
if self.batch_first:
x = x.transpose(0, 1)
return accumulator(x, h0)
class AffineIntegrator(nn.Module):
"""
A layer that integrates and affine-transforms a series:
y_t = y0 + (A * y_(t-1) + a) + (B * u_t + b)
Where `A, a, B, b` are learnable parameters. `y_t` is the output and `u_t`
is the input. `A, B` are square matrices, and `a, b` are bias vectors.
Args:
* `input_size`: Number of features in input time series,
* `batch_first`: If False, inputs/outputs are of shape (`Time, Batch,
Feature`) - default. If True, the shape is (`Batch, Time, Feature`).
* `fixed`: Whether the parameters can be learned or not. If `fixed=True`, then
`A, C` are identity matrices, and `b, d` are 0. This makes it a simple
integrator. If `fixed=False`, the initializations can be updated.
"""
# pylint: disable=no-member, not-callable
def __init__(self, input_size=1, batch_first=False, fixed=False):
super().__init__()
rnn1 = nn.RNN(input_size=input_size, hidden_size=input_size,
nonlinearity='relu', batch_first=batch_first)
rnn2 = nn.RNN(input_size=input_size, hidden_size=input_size,
nonlinearity='relu', batch_first=batch_first)
# We use 2 "identical" ReLU RNNs w/ flipped weights. Input-Hidden and
# Hidden-Hidden weights are identity matrices. This means that an RNN
# is basically summing a sequence.
# h_t = (1 * x_t) + (1 * h_(t-1))
# RNN1 has positive identity weights, so the ReLU lets positive sums
# through. __/
# RNN2 has negative identity weights, so the ReLU lets negative sums
# through. \__
# RNN1's hidden state maintains a +ive version of the sum and RNN2's
# hidden state maintains a -ive version. We finally subtract these two
# RNNs to reconstruct the actual sum:
# /
# -(\__) + (__/) = /
# /
# Each rnn specializes in positive/negative ranges of the integral.
# This means that RNN1's gradients are 0-ed out for negative integrals,
# and RNN2's gradients are 0-ed out by positive integrals. So as training
# progresses, both rnn's get differente gradient updates which causes
# their behaviour to diverge.
# TODO: Tie rnn weights together OR use a rnn w/ no activation which
# sidesteps the need of having 2 ReLU RNNs in the first place.
rnn1.weight_ih_l0.data = torch.eye(input_size)
rnn1.weight_hh_l0.data = torch.eye(input_size)
rnn1.bias_ih_l0.data = torch.zeros(input_size)
rnn1.bias_hh_l0.data = torch.zeros(input_size)
rnn2.weight_ih_l0.data = -torch.eye(input_size)
rnn2.weight_hh_l0.data = -torch.eye(input_size)
rnn2.bias_ih_l0.data = torch.zeros(input_size)
rnn2.bias_hh_l0.data = torch.zeros(input_size)
self.rnn1 = rnn1
self.rnn2 = rnn2
for p in self.parameters():
p.requires_grad = not fixed
# TODO: How to get rid of this dummy tensor? This is needed in the edge
# case this class is used as a stand-alone module with fixed=True and
# given to an Optimizer. It will complain that there are no learnable
# parameters.
self.dummy = nn.Parameter(torch.tensor(0.))
def forward(self, u: torch.Tensor, y0: torch.Tensor=None) -> torch.Tensor:
"""
Run input through the layer.
Args:
* `u`: `torch.Tensor` of shape `(Time, Batch, Feature)` if `batch_first=
False` else of shape `(Batch, Time, Feature)`.
* `y0`: `torch.Tensor` of shape `(Batch, Feature)` containing the initial
state. Defaults to 0.
Returns:
* `torch.Tensor` of shape `(Time, Batch, Feature)` if `batch_first=
False` else of shape `(Batch, Time, Feature)`.
"""
y1, _ = self.rnn1(u, y0)
y2, _ = self.rnn2(u, y0)
return (y1 - y2) + self.dummy
class System(nn.Module):
"""
Models a dynamic system.
dx = F(x_t, u_t) -- (1)
x_t = x0 + (A * x_(t-1)) + (B * dx) -- (2)
y_t = G(x_t, u_t) -- (3)
Where `x` is the internal representation of the state of the system. `u` and
`y` are input and output sequences respectively.
Args:
* `dx`: a `Module` representing `F` in `(1)` which takes input sequences of
size `input_size + state_size` and outputs `state_size` feature sequences.
* `y`: a `Module` represening `G` in `(3)` which takes input sequences of
size `input_size + state_size` and outputs `output_size` feature sequences.
* `input_size`: Number of features in input sequences `u`,
* `state_size`: Number of features in internal state representation `x`. If
0, then internal state is a function of just the transformed input `F(u)`.
* `output_size`: Number of features in system output.
* `batch_first`: If False, inputs/outputs are of shape (`Time, Batch,
Feature`) - default. If True, the shape is (`Batch, Time, Feature`).
* `fixed`: Whether the parameters can be learned or not. _Note_: This only
affects parameters in `(2)`.
"""
# pylint: disable=no-member, not-callable
def __init__(self, dx: nn.Module=None, y: nn.Module=None, input_size: int=1,
state_size: int=0, output_size: int=1, batch_first: bool=False,
fixed: bool=False):
super().__init__()
self.batch_first = batch_first
self.state_size = state_size
self.input_size = input_size
self.output_size = output_size
repr_size = input_size if self.state_size == 0 else state_size
self.x1 = nn.RNNCell(input_size=repr_size,
hidden_size=repr_size,
bias=False, nonlinearity='relu')
self.x2 = nn.RNNCell(input_size=repr_size,
hidden_size=repr_size,
bias=False, nonlinearity='relu')
self.x1.weight_ih = nn.Parameter(torch.eye(repr_size, repr_size))
self.x1.weight_hh = nn.Parameter(torch.eye(repr_size, repr_size))
self.x2.weight_ih = nn.Parameter(-torch.eye(repr_size, repr_size))
self.x2.weight_hh = nn.Parameter(-torch.eye(repr_size, repr_size))
for p in self.parameters():
p.require_grad = not fixed
self.dx = dx
self.y = y
def forward(self, u: torch.Tensor, x0: torch.Tensor=None) -> torch.Tensor:
"""
Run input through the layer.
Args:
* `u`: `torch.Tensor` of shape `(Time, Batch, Feature)` if `batch_first=
False` else of shape `(Batch, Time, Feature)`.
* `x0`: `torch.Tensor` of shape `(Batch, state_size)` containing the
initial state. Defaults to 0.
Returns:
* `torch.Tensor` of shape `(Time, Batch, Feature)` if `batch_first=
False` else of shape `(Batch, Time, Feature)`.
"""
L, N = self._size(u)
Y = torch.empty(L, N, self.output_size)
x = x0 if (x0 is not None)\
else torch.zeros(N, max(1, self.state_size))
x1, x2 = x, -x
for i, u_t in enumerate(self._sequence(u)):
if self.state_size == 0:
dx = self.dx(u_t) if self.dx is not None else u_t
x1 = self.x1(dx, x1)
x2 = self.x2(dx, x2)
x = x1 - x2
Y[i] = self.y(x) if self.y is not None else x
else:
x_u = torch.cat((x, u_t), dim=1)
dx = self.dx(x_u) if self.dx is not None else x_u
x1 = self.x1(dx, x1)
x2 = self.x2(dx, x2)
x = x1 - x2
x_u = torch.cat((x, u_t), dim=1)
Y[i] = self.y(x_u) if self.y is not None else x
return Y.transpose(0,1) if self.batch_first else Y
def _sequence(self, u: torch.Tensor) -> torch.Tensor:
# Switch axes so that first dimension is time
if self.batch_first:
return u.transpose(0, 1)
else:
return u