Commit c95e806c authored by hazrmard's avatar hazrmard
Browse files

refactored standard and neural network state space estimators

parent ce138d55
*.ini
*.ipynb
pytorchbridge.egg-info
__pycache__/
\ No newline at end of file
__pycache__/
.vscode/
.ipynb_checkpoints/
\ No newline at end of file
......@@ -34,7 +34,7 @@ Three classes are defined:
### `Trapezoid`
Uses trapezoid integration rule to integrate `(1)`. The `dxdt` and `y` methods need to be overridden.
Uses trapezoid integration rule to integrate `(1)`. `dxdt` and `y` methods can be overridden or provided as arguments.
### `LTISystem`
......
from .standard import Trapezoid
from .standard import LTISystem
from .standard import SS_ODE
from typing import Union, Callable, Tuple
from numbers import Number
import numpy as np
import torch
from torch import Tensor
import torch.nn as nn
class Trapezoid(nn.Module):
"""
A state-space model that uses trapezoid integration to predict state variables.
`dxdt` and `y` methods need to be overridden.
"""
def __init__(self, dims: int, outputs: int, dx: Union[Callable, nn.Module]=None,
out: Union[Callable, nn.Module]=None, tstep: float=1e-4):
"""
Arguments:
dims {int} -- Number of state-space variables.
outputs {int} -- Number of output variables.
dx {Callable} -- A function with a signature (time, x, u) -> np.ndarray
that returns dx/dt. `x, u` are vectors. `time` is a float.
out {Callable} -- A function with a signature (time, x, u) -> np.ndarray
that returns y. `x, u` are vectors. `time` is a float.
Keyword Arguments:
tstep {float} -- Minimum size of simulation step (default: {1e-4})
"""
super().__init__()
self.dims = dims
self.outputs = outputs
self.dx = dx
self.out = out
self.tstep = tstep
def create_parameters(self):
self.b_ux = nn.Parameter(Tensor(self.dims, 1))
self.w_ux = nn.Parameter(Tensor(self.dims, self.dims))
self.w_xx = nn.Parameter(Tensor(self.dims, self.dims))
def reset_parameters(self):
self.b_ux.data = torch.zeros(self.dims, 1)
self.w_ux.data = torch.eye(self.dims) * self.tstep
self.w_xx.data = torch.eye(self.dims)
def dxdt(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray:
if self.dxdt is not None:
return self.dx(t, x, u)
raise NotImplementedError
def y(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray:
if self.out is not None:
return self.out(t, x, u)
raise NotImplementedError
def _init(self, x0, u):
self.create_parameters()
self.reset_parameters()
if isinstance(self.dx, nn.Module):
self.dx_ = self.dx.forward
if isinstance(self.out, nn.Module):
self.out_ = self.out.forward
if isinstance(x0, torch.Tensor):
type_x0 = x0.dtype
elif isinstance(x0, np.ndarray):
type_x0 = torch.float32 if x0.dtype == np.float32 else torch.float64
else:
type_u = torch.float32
if isinstance(u, torch.Tensor):
type_u = u.dtype
elif isinstance(u, np.ndarray):
type_u = torch.float32 if u.dtype == np.float32 else torch.float64
else:
type_u = torch.float32
if type_u == torch.float64 or type_x0 == torch.float64:
self.type_ = torch.float64
else:
self.type_ = torch.float32
self.to(self.type_)
def predict(self, t: np.ndarray, x0: np.ndarray=None, u: np.ndarray=None) \
-> Tuple[Tensor, Tensor]:
"""
Generate system response to inputs. The system response is calculated through
trapezoid rule.
Arguments:
t {np.ndarray} -- The end time (float) or an array of time stamps for
the corresponding element in `u`.
x0 {np.ndarray} -- The initial state of the system.
u {np.ndarray} -- An `N x Inputs` array of inputs,
Returns:
np.ndarray -- array of shape `N x Outputs`,
"""
self._init(x0, u)
xdim = self.dims
if isinstance(t, Number):
t = torch.from_numpy(np.linspace(0, t, len(u) + 1, endpoint=True)).to(self.type_)
xt = (torch.zeros(xdim) if x0 is None else torch.tensor(x0)).to(self.type_)
X = torch.zeros((len(t), self.dims)).to(self.type_)
X[0] = xt
Y = torch.zeros((len(t), self.outputs)).to(self.type_)
Y[0] = self.y(t[0], X[0], u[0])
for i in range(1, len(t)):
Dt = t[i] - t[i - 1] # Time step in supplied data
dt = min(Dt, self.tstep) # Time step to take per calculation
while True:
dx = torch.tensor(self.dxdt(t[i] - Dt, xt, u[i - 1])).to(self.type_)
xt = self.w_xx @ xt + (self.w_ux * dt) @ dx
Dt -= dt
dt = min(Dt, self.tstep)
if dt == 0: break
X[i] = xt
Y[i] = self.y(t[i], xt, u[i - 1])
return X, Y
......@@ -2,7 +2,7 @@
Defines state space class for rough linear system modelling.
"""
from numbers import Number
from typing import Callable, Tuple, Any
from typing import Callable, Tuple, Union
import numpy as np
from scipy.integrate import odeint
......@@ -16,25 +16,36 @@ class Trapezoid:
"""
def __init__(self, dims: int, outputs: int, tstep: float=1e-4):
"""
def __init__(self, dims: int, outputs: int, dx: Callable=None, out: Callable=None,
tstep: float=1e-4):
"""
Arguments:
dims {int} -- Number of state-space variables.
outputs {int} -- Number of output variables.
dx {Callable} -- A function with a signature (time, x, u) -> np.ndarray
that returns dx/dt. `x, u` are vectors. `time` is a float.
out {Callable} -- A function with a signature (time, x, u) -> np.ndarray
that returns y. `x, u` are vectors. `time` is a float.
Keyword Arguments:
tstep {float} -- Minimum size of simulation step (default: {1e-4})
"""
self.dims = dims
self.outputs = outputs
self.dx = dx
self.out = out
self.tstep = tstep
def dxdt(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray:
if self.dxdt is not None:
return self.dx(t, x, u)
raise NotImplementedError
def y(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray:
if self.out is not None:
return self.out(t, x, u)
raise NotImplementedError
......@@ -53,11 +64,15 @@ class Trapezoid:
Returns:
np.ndarray -- array of shape `N x Outputs`,
"""
# t: 0 1 2 3 4 5
# x: x0 ( )
# u: u u u u u u
#
xdim = self.dims
if isinstance(t, Number):
t = np.linspace(0, t, len(u), endpoint=False)
t = np.linspace(0, t, len(u) + 1, endpoint=True)
xt = np.zeros(xdim) if x0 is None else x0
xt = np.zeros(xdim) if x0 is None else np.copy(x0)
X = np.zeros((len(t), self.dims))
X[0] = xt
Y = np.zeros((len(t), self.outputs))
......@@ -68,7 +83,7 @@ class Trapezoid:
Dt = t[i] - t[i - 1] # Time step in supplied data
dt = min(Dt, self.tstep) # Time step to take per calculation
while True:
dx = self.dxdt(t[i] - Dt, xt, u[i])
dx = self.dxdt(t[i] - Dt, xt, u[i - 1])
# xt += 0.5 * dt * (dx + dxprev)
xt += dt * dx
......@@ -76,7 +91,8 @@ class Trapezoid:
dt = min(Dt, self.tstep)
if dt == 0: break
X[i] = xt
Y[i] = self.y(t[i], xt, u[i])
Y[i] = self.y(t[i], xt, u[i - 1])
# return X[1:], Y
return X, Y
......@@ -117,39 +133,10 @@ class LTISystem(Trapezoid):
class SS_ODE:
def __init__(self, dims: int, outputs: int, dx: Callable, out: Callable,
tstep: float=1e-4):
"""
Arguments:
dims {int} -- Number of state-space variables.
outputs {int} -- Number of output variables.
dx {Callable} -- A function with a signature (time, x, u) -> np.ndarray
that returns dx/dt. `x, u` are vectors. `time` is a float.
out {Callable} -- A function with a signature (time, x, u) -> np.ndarray
that returns y. `x, u` are vectors. `time` is a float.
Keyword Arguments:
tstep {float} -- Maximum size of integration step (default: {1e-4})
"""
self.dx = dx
self.out = out
self.tstep = tstep
self.dims = dims
self.outputs = outputs
def dxdt(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray:
return self.dx(t, x, u)
def y(self, t: float, x: np.ndarray, u: np.ndarray) -> np.ndarray:
return self.out(t, x, u)
class SS_ODE(Trapezoid):
def predict(self, t: Any[float, np.ndarray], x0: np.ndarray, u: np.ndarray) \
def predict(self, t: Union[float, np.ndarray], x0: np.ndarray, u: np.ndarray) \
-> Tuple[np.ndarray, np.ndarray]:
if isinstance(t, Number):
t = np.linspace(0, t, len(u), endpoint=False)
......@@ -157,7 +144,7 @@ class SS_ODE:
u = np.concatenate(([u[0]-u[0]], u), axis=0)
X = np.zeros((len(t), self.dims))
Y = np.zeros((len(t), self.outputs))
X[0] = np.zeros(self.dims) if x0 is None else x0
X[0] = np.zeros(self.dims) if x0 is None else np.copy(x0)
Y[0] = self.y(t[0], X[0], u[0])
for idx in range(1, len(t)):
X[idx] = odeint(func=self.dxdt,
......
......@@ -18,7 +18,7 @@ class TestTrapezoid(TestCase):
def setUp(self):
class _Trapezoid(Trapezoid):
def dxdt(self, t, x, u):
return np.asarray([1., 1.])
return np.asarray([1., 1.]) + u
def y(self, t, x, u):
return 1*x
self.solver = _Trapezoid(dims=2, outputs=2, tstep=0.5)
......@@ -26,7 +26,7 @@ class TestTrapezoid(TestCase):
def test_integration(self):
u = np.zeros(10)
t = np.arange(10) * 2
t = np.arange(10).astype(np.float32)
x, y = self.solver.predict(t, np.asarray([0., 0.]), u)
self.assertEqual(len(y), len(t), 'Input/output length mismatch.')
......@@ -37,7 +37,7 @@ class TestSS_ODE(TestTrapezoid):
def setUp(self):
self.solver = SS_ODE(dims=2, outputs=2,
dx=lambda t, x, u: np.asarray([1., 1.]),
dx=lambda t, x, u: np.asarray([1., 1.]) + u,
out=lambda t, x, u: 1*x,
tstep=1.0)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment