Source code for pyrado.policies.feed_back.two_headed_fnn

# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
#    or Technical University of Darmstadt, nor the names of its contributors may
#    be used to endorse or promote products derived from this software without
#    specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from typing import Callable, Sequence, Tuple

import torch as to
import torch.nn as nn

from pyrado.policies.base import TwoHeadedPolicy
from pyrado.policies.feed_back.fnn import FNN
from pyrado.policies.initialization import init_param
from pyrado.utils.data_types import EnvSpec


[docs]class TwoHeadedFNNPolicy(TwoHeadedPolicy): """Policy architecture which has a common body and two heads that have a separate last layer""" name: str = "thfnn" def __init__( self, spec: EnvSpec, shared_hidden_sizes: Sequence[int], shared_hidden_nonlin: [Callable, Sequence[Callable]], head_1_size: int = None, head_2_size: int = None, head_1_output_nonlin: Callable = None, head_2_output_nonlin: Callable = None, shared_dropout: float = 0.0, init_param_kwargs: dict = None, use_cuda: bool = False, ): """ Constructor :param spec: environment specification :param shared_hidden_sizes: sizes of shared hidden layer outputs. Every entry creates one shared hidden layer. :param shared_hidden_nonlin: nonlinearity for the shared hidden layers :param head_1_size: size of the fully connected layer for head 1, if `None` this is set to the action space dim :param head_2_size: size of the fully connected layer for head 2, if `None` this is set to the action space dim :param head_1_output_nonlin: nonlinearity for output layer of the first head :param head_2_output_nonlin: nonlinearity for output layer of the second head :param shared_dropout: dropout probability, default = 0 deactivates dropout :param init_param_kwargs: additional keyword arguments for the policy parameter initialization :param use_cuda: `True` to move the policy to the GPU, `False` (default) to use the CPU """ super().__init__(spec, use_cuda) # Create the feed-forward neural network self.shared = FNN( input_size=spec.obs_space.flat_dim, output_size=shared_hidden_sizes[-1], hidden_sizes=shared_hidden_sizes, hidden_nonlin=shared_hidden_nonlin, dropout=shared_dropout, output_nonlin=None, use_cuda=use_cuda, ) # Create output layer head_1_size = spec.act_space.flat_dim if head_1_size is None else head_1_size head_2_size = spec.act_space.flat_dim if head_2_size is None else head_2_size self.head_1 = nn.Linear(shared_hidden_sizes[-1], head_1_size) self.head_2 = nn.Linear(shared_hidden_sizes[-1], head_2_size) self.head_1_output_nonlin = head_1_output_nonlin self.head_2_output_nonlin = head_2_output_nonlin # Call custom initialization function after PyTorch network parameter initialization init_param_kwargs = init_param_kwargs if init_param_kwargs is not None else dict() self.init_param(None, **init_param_kwargs) self.to(self.device)
[docs] def init_param(self, init_values: to.Tensor = None, **kwargs): if init_values is None: self.shared.init_param(None, **kwargs) init_param(self.head_1, **kwargs) init_param(self.head_2, **kwargs) else: self.param_values = init_values
[docs] def forward(self, obs: to.Tensor) -> Tuple[to.Tensor, to.Tensor]: obs = obs.to(device=self.device) # Get the output of the last shared layer and pass this to the two headers separately x = self.shared(obs) output_1 = self.head_1(x) output_2 = self.head_2(x) if self.head_1_output_nonlin is not None: output_1 = self.head_1_output_nonlin(output_1) if self.head_2_output_nonlin is not None: output_2 = self.head_2_output_nonlin(output_2) return output_1, output_2