# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
# or Technical University of Darmstadt, nor the names of its contributors may
# be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from functools import reduce
from typing import Any, Callable, Sequence, Tuple, Union
from warnings import warn
import numpy as np
import torch as to
import pyrado
from pyrado.utils import get_class_name
from pyrado.utils.data_processing import normalize
from pyrado.utils.input_output import print_cbt
[docs]class FeatureStack:
"""
Features are nonlinear transformations of the inputs.
.. note::
We only consider 1-dim inputs.
"""
def __init__(self, *feat_fcns: Sequence[Callable[[to.Tensor], Any]]):
"""
Constructor
:param feat_fcns: feature functions, each of them maps from a multi-dim input to a multi-dim output
(e.g. `identity_feat`, `squared_feat`, exception: `const_feat`)
"""
self.feat_fcns = tuple(feat_fcns)
if len(self.feat_fcns) < 1:
raise pyrado.ShapeErr(given=self.feat_fcns, expected_match=(-1,))
def __str__(self):
"""Get an information string."""
feat_fcn_names = [f.__name__ if f is callable else str(f) for f in self.feat_fcns]
return f"{get_class_name(self)} [" + ", ".join(feat_fcn_names) + "]"
def __call__(self, inp: to.Tensor) -> to.Tensor:
"""
Evaluate the features for a given input.
:param inp: input, i.e. observations in the RL setting
:return: 1-dim tensor with a value of every feature of the input
"""
feats_val = [fcn(inp) for fcn in self.feat_fcns]
return to.cat(feats_val, dim=-1).to(dtype=inp.dtype)
[docs] def get_num_feat(self, inp_flat_dim: int) -> int:
"""
Calculate the number of features which depends on the dimension of the input and the selected feature functions.
:param inp_flat_dim: flattened dimension input to the feature functions
:return: number of feature values
"""
num_fcns = len(self.feat_fcns)
# Add the number of parameters for all feature functions that are based on the observations
num_feat = num_fcns * inp_flat_dim
# Special cases
if const_feat in self.feat_fcns: # check for a function
# We do not care about the number of observations
num_feat = num_feat - inp_flat_dim + 1
if any(isinstance(f, RFFeat) for f in self.feat_fcns):
# List all random fourier features
rffs = [isinstance(f, RFFeat) for f in self.feat_fcns] # could be more that one rff
for i, fcn in enumerate(self.feat_fcns):
# We do not care about the number of observations, but we added them before
num_feat -= inp_flat_dim
# Get the number of features from the current item
if rffs[i]:
num_feat += fcn.num_feat_per_dim
if any(isinstance(f, RBFFeat) for f in self.feat_fcns):
# List all radial basis features
rbfs = [isinstance(f, RBFFeat) for f in self.feat_fcns] # could be more that one rbf
for i, fcn in enumerate(self.feat_fcns):
# We do not care about the number of observations, but we added them before
num_feat -= inp_flat_dim
# Get the number of features from the current item
if rbfs[i]:
num_feat += fcn.num_feat
if any(isinstance(f, ATan2Feat) for f in self.feat_fcns):
for fcn in self.feat_fcns:
if isinstance(fcn, ATan2Feat):
# We do not care about the number of observations, but we added them before
num_feat = num_feat - inp_flat_dim + 1
if any(isinstance(f, MultFeat) for f in self.feat_fcns):
for fcn in self.feat_fcns:
if isinstance(fcn, MultFeat):
# We do not care about the number of observations, but we added them before
num_feat = num_feat - inp_flat_dim + 1
return num_feat
[docs]def const_feat(inp: to.Tensor):
if len(inp.shape) == 2:
# When the input is batched, we need to broadcast that manually
return to.ones(inp.shape[0], 1).type_as(inp)
else:
# When the input is not given in batches
return to.tensor([1.0]).type_as(inp)
[docs]def identity_feat(inp: to.Tensor):
return inp.clone()
[docs]def sign_feat(inp: to.Tensor):
return to.sign(inp)
[docs]def abs_feat(inp: to.Tensor):
return to.abs(inp)
[docs]def squared_feat(inp: to.Tensor):
return to.pow(inp, 2)
[docs]def cubic_feat(inp: to.Tensor):
return to.pow(inp, 3)
[docs]def sig_feat(inp: to.Tensor, scale: float = 1.0):
return to.sigmoid(scale * inp)
[docs]def bell_feat(inp: to.Tensor, scale: float = 1.0):
return to.exp(-scale * to.pow(inp, 2))
[docs]def sin_feat(inp: to.Tensor):
return to.sin(inp)
[docs]def cos_feat(inp: to.Tensor):
return to.cos(inp)
[docs]def sinsin_feat(inp: to.Tensor):
return to.sin(inp) * to.sin(inp)
[docs]def sincos_feat(inp: to.Tensor):
return to.sin(inp) * to.cos(inp)
[docs]class MultFeat:
"""Feature that multiplies two dimensions of the given input / observation"""
def __init__(self, idcs: Tuple):
"""
Constructor
:param idcs: indices of the dimensions to multiply
"""
if not isinstance(idcs, tuple):
raise pyrado.TypeErr(given=idcs, expected_type=tuple)
if not len(idcs) >= 2:
raise pyrado.ShapeErr(msg="Provide at least provide two indices.")
self._idcs = idcs
def __str__(self):
"""Get an information string."""
return f"{get_class_name(self)} (indices " + " ".join([str(i) for i in self._idcs]) + ")"
def __call__(self, inp: to.Tensor) -> to.Tensor:
"""
Evaluate the feature.
:param inp: input i.e. observations in the RL setting
:return: feature value
"""
return reduce(to.mul, [inp[i] for i in self._idcs]).unsqueeze(0) # unsqueeze for later concatenation
[docs]class ATan2Feat:
"""Feature that computes the atan2 from two dimensions of the given input / observation."""
def __init__(self, idx_sin: int, idx_cos: int):
"""
Constructor
:param idx_sin: indices of the numerator, i.e. the sin-transformed observation dimension
:param idx_cos: indices of the denominator, i.e. the cos-transformed observation dimension
"""
if not isinstance(idx_sin, int):
raise pyrado.TypeErr(given=idx_sin, expected_type=int)
if not isinstance(idx_cos, int):
raise pyrado.TypeErr(given=idx_cos, expected_type=int)
self._idx_sin = idx_sin
self._idx_cos = idx_cos
def __str__(self):
"""Get an information string."""
return f"{get_class_name(self)} (index for numerator {self._idx_sin}, index for denominator {self._idx_cos})"
def __call__(self, inp: to.Tensor) -> to.Tensor:
"""
Evaluate the feature.
:param inp: input i.e. observations in the RL setting
:return: feature value
"""
return to.atan2(inp[self._idx_sin], inp[self._idx_cos]).unsqueeze(0) # unsqueeze for later concatenation
[docs]class RFFeat:
"""
Random Fourier (RF) features
.. seealso::
[1] A. Rahimi and B. Recht "Random Features for Large-Scale Kernel Machines", NIPS, 2007
"""
def __init__(
self,
inp_dim: int,
num_feat_per_dim: int,
bandwidth: Union[float, np.ndarray, to.Tensor],
use_cuda: bool = False,
):
r"""
Gaussian kernel: $k(x,y) = \exp(-\sigma**2 / (2*d) * ||x-y||^2)$
Sample from $\mathcal{N}(0,1)$ and scale the result by $\sigma / \sqrt{2*d}$
:param inp_dim: flat dimension of the inputs i.e. the observations, called $d$ in [1]
:param num_feat_per_dim: number of random Fourier features, called $D$ in [1]. In contrast to the `RBFFeat`
class, the output dimensionality, thus the number of associated policy parameters is
`num_feat_per_dim` and not`num_feat_per_dim * inp_dim`.
:param bandwidth: scaling factor for the sampled frequencies. Pass a constant scalar value, for example
`env.obs_space.bound_up`. According to [1] and the note above we should use d here.
Actually, it is not a bandwidth since it is not a frequency.
:param use_cuda: `True` to move the module to the GPU, `False` (default) to use the CPU
"""
self.num_feat_per_dim = num_feat_per_dim
self.scale = to.sqrt(to.tensor(2.0 / num_feat_per_dim))
# Sample omega from a standardized normal distribution
self.freq = to.randn(num_feat_per_dim, inp_dim)
# Scale the frequency matrix with the bandwidth factor
if not isinstance(bandwidth, to.Tensor):
bandwidth = to.from_numpy(np.asanyarray(bandwidth))
self.freq *= to.sqrt(to.tensor(2.0) / to.atleast_2d(bandwidth))
# Sample b from a uniform distribution [0, 2pi]
self.shift = 2 * np.pi * to.rand(num_feat_per_dim)
# Move to the correct device
if not use_cuda:
self._device = "cpu"
elif use_cuda and to.cuda.is_available():
self._device = "cuda"
elif use_cuda and not to.cuda.is_available():
warn("Tried to run on CUDA, but it is not available. Falling back to CPU.", "r")
self._device = "cpu"
self.scale = self.scale.to(device=self._device)
self.freq = self.freq.to(device=self._device)
self.shift = self.shift.to(device=self._device)
def __call__(self, inp: to.Tensor) -> to.Tensor:
"""
Evaluate the features, see [1].
.. note::
Only processing of 1-dim input (e.g., no images)! The input can be batched along the first dimension.
:param inp: input i.e. observations in the RL setting
:return: 1-dim vector of all feature values given the observations
"""
inp = inp.to(device=self._device, dtype=to.get_default_dtype())
if inp.ndimension() > 2:
raise pyrado.ShapeErr(msg="RBF class can only handle 1-dim or 2-dim input!")
inp = to.atleast_2d(inp) # shape of [batch_size, dim_input]
# Resize if batched and return the feature value
shift = self.shift.repeat(inp.shape[0], 1)
return self.scale * to.cos(to.mm(inp, self.freq.T) + shift)
[docs]class RBFFeat:
"""Normalized Gaussian radial basis function features"""
def __init__(
self,
num_feat_per_dim: int,
bounds: [Sequence[list], Sequence[tuple], Sequence[np.ndarray], Sequence[to.Tensor], Sequence[float]],
scale: Union[float, to.Tensor] = None,
state_wise_norm: bool = True,
use_cuda: bool = False,
):
"""
Constructor
:param num_feat_per_dim: number of radial basis functions, identical for every dimension of the input
:param bounds: lower and upper bound for the Gaussians' centers, the input dimension is inferred from them
:param scale: scaling factor for the squared distance, if `None` the factor is determined such that two
neighboring RBFs have a value of 0.2 at the other center
:param state_wise_norm: `True` to apply the normalization across input state dimensions separately (every
dimension sums to one), or `False` to jointly normalize them
:param use_cuda: `True` to move the module to the GPU, `False` (default) to use the CPU
"""
if not num_feat_per_dim > 1:
raise pyrado.ValueErr(given=num_feat_per_dim, g_constraint="1")
if not len(bounds) == 2:
raise pyrado.ShapeErr(given=bounds, expected_match=np.empty(2))
# Get the bounds, e.g. from the observation space and then clip them in case the
bounds_to = [None, None]
for i, b in enumerate(bounds):
if isinstance(b, np.ndarray):
bounds_to[i] = to.from_numpy(b)
elif isinstance(b, to.Tensor):
bounds_to[i] = b.clone()
elif isinstance(b, (list, tuple)):
bounds_to[i] = to.tensor(b, dtype=to.get_default_dtype())
elif isinstance(b, (int, float)):
bounds_to[i] = to.tensor(b, dtype=to.get_default_dtype()).view(1)
else:
raise pyrado.TypeErr(given=b, expected_type=[np.ndarray, to.Tensor, list, tuple, int, float])
if any([any(np.isinf(b)) for b in bounds_to]):
bound_lo, bound_up = [to.clamp(b, min=-1e6, max=1e6) for b in bounds_to]
print_cbt("Clipped the bounds of the RBF centers to [-1e6, 1e6].", "y")
else:
bound_lo, bound_up = bounds_to
# Create a matrix with center locations for the Gaussians
num_dim = len(bound_lo)
self.num_feat = num_feat_per_dim * num_dim
self.centers = to.empty(num_feat_per_dim, num_dim)
for i in range(num_dim):
# Features along columns
self.centers[:, i] = to.linspace(bound_lo[i], bound_up[i], num_feat_per_dim)
if scale is None:
delta_center = self.centers[1, :] - self.centers[0, :]
self.scale = -to.log(to.tensor(0.2)) / to.pow(delta_center, 2)
else:
self.scale = to.as_tensor(scale, dtype=to.get_default_dtype())
self._state_wise_norm = state_wise_norm
# Move to the correct device
if not use_cuda:
self._device = "cpu"
elif use_cuda and to.cuda.is_available():
self._device = "cuda"
elif use_cuda and not to.cuda.is_available():
warn("Tried to run on CUDA, but it is not available. Falling back to CPU.", "r")
self._device = "cpu"
self.centers = self.centers.to(device=self._device)
self.scale = self.scale.to(device=self._device)
def _normalize_and_reshape(self, inp: to.Tensor) -> to.Tensor:
"""
Normalize (depending on `state_wise_norm`) and reshape the input.
:param inp: input tensor of exponentiated squared distances
:return: feature value
"""
if self._state_wise_norm:
# Normalize the features such that the activation for every state dimension sums up to one
return normalize(inp, axis=0, order=1).t().reshape(-1)
else:
# Turn the features into a vector and normalize over all of them
return normalize(inp.t().reshape(-1), axis=-1, order=1)
def __call__(self, inp: to.Tensor) -> to.Tensor:
"""
Evaluate the features and normalize them.
.. note::
Only processing of 1-dim input (e.g., no images)! The input can be batched along the first dimension.
:param inp: input i.e. observations in the RL setting
:return: 1-dim vector of all feature values given the observations
"""
inp = inp.to(device=self._device, dtype=to.get_default_dtype())
if inp.ndimension() > 2:
raise pyrado.ShapeErr(msg="RBF class can only handle 1-dim or 2-dim input!")
inp = to.atleast_2d(inp) # first dim is the batch size, the second dim it the actual input dimension
inp = inp.reshape(inp.shape[0], 1, inp.shape[1]).repeat(1, self.centers.shape[0], 1) # reshape explicitly
# Exponentiate the squared distances
exp_sq_dist = to.exp(-self.scale * to.pow(inp - self.centers, 2))
# Normalize, reshape, and return the feature values
return to.stack([self._normalize_and_reshape(esd) for esd in exp_sq_dist])
[docs] def derivative(self, inp: to.Tensor) -> to.Tensor:
"""
Compute the derivative of the features w.r.t. the inputs.
.. note::
Only processing of 1-dim input (e.g., no images)! The input can be batched along the first dimension.
:param inp: input i.e. observations in the RL setting
:return: value of all features derivatives given the observations
"""
if inp.ndimension() > 2:
raise pyrado.ShapeErr(msg="RBF class can only handle 1-dim or 2-dim input!")
inp = to.atleast_2d(inp) # first dim is the batch size, the second dim it the actual input dimension
inp = inp.reshape(inp.shape[0], 1, inp.shape[1]).repeat(1, self.centers.shape[0], 1) # reshape explicitly
exp_sq_dist = to.exp(-self.scale * to.pow(inp - self.centers, 2))
exp_sq_dist_d = -2 * self.scale * (inp - self.centers)
feat_val = to.empty(inp.shape[0], self.num_feat)
feat_val_dot = to.empty(inp.shape[0], self.num_feat)
for i, (sample, sample_d) in enumerate(zip(exp_sq_dist, exp_sq_dist_d)):
if self._state_wise_norm:
# Normalize the features such that the activation for every state dimension sums up to one
feat_val[i, :] = normalize(sample, axis=0, order=1).reshape(-1)
else:
# Turn the features into a vector and normalize over all of them
feat_val[i, :] = normalize(
sample.t().reshape(-1),
axis=-1,
order=1,
)
feat_val_dot[i, :] = sample_d.reshape(-1) * feat_val[i, :] - feat_val[i, :] * sum(
sample_d.reshape(-1) * feat_val[i, :]
)
return feat_val_dot