# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
# or Technical University of Darmstadt, nor the names of its contributors may
# be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os.path as osp
import numpy as np
import pytest
import torch as to
from tests.conftest import m_needs_bullet, m_needs_cuda, m_needs_libtorch, m_needs_mujoco, m_needs_rcs
from torch import nn as nn
import pyrado
from pyrado.environments.base import Env
from pyrado.policies.base import Policy
from pyrado.policies.features import (
FeatureStack,
MultFeat,
RBFFeat,
RFFeat,
abs_feat,
bell_feat,
const_feat,
cos_feat,
cubic_feat,
identity_feat,
sig_feat,
sign_feat,
sin_feat,
sincos_feat,
sinsin_feat,
squared_feat,
)
from pyrado.policies.feed_back.dual_rfb import DualRBFLinearPolicy
from pyrado.policies.feed_back.linear import LinearPolicy
from pyrado.policies.feed_forward.playback import PlaybackPolicy
from pyrado.policies.feed_forward.poly_time import PolySplineTimePolicy
from pyrado.policies.recurrent.base import RecurrentPolicy, default_pack_hidden, default_unpack_hidden
from pyrado.policies.recurrent.two_headed_rnn import TwoHeadedRNNPolicyBase
from pyrado.policies.special.environment_specific import (
QBallBalancerPDCtrl,
QCartPoleSwingUpAndBalanceCtrl,
QQubeSwingUpAndBalanceCtrl,
)
from pyrado.sampling.rollout import rollout
from pyrado.sampling.step_sequence import StepSequence
from pyrado.utils.data_types import RenderMode
from pyrado.utils.nn_layers import IndiNonlinLayer
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"feat_list",
[
[const_feat],
[identity_feat],
[
const_feat,
identity_feat,
sign_feat,
abs_feat,
squared_feat,
cubic_feat,
sig_feat,
bell_feat,
sin_feat,
cos_feat,
sinsin_feat,
sincos_feat,
],
],
ids=["const_only", "ident_only", "all_simple_feats"],
)
def test_simple_feature_stack(feat_list: list):
fs = FeatureStack(*feat_list)
obs = to.randn(1)
feats_val = fs(obs)
assert isinstance(feats_val, to.Tensor)
[docs]@pytest.mark.features
@pytest.mark.parametrize("obs_dim, idcs", [(2, (0, 1)), (3, (2, 0)), (10, (0, 1, 5, 6))], ids=["2_2", "3_2", "10_4"])
def test_mul_feat(obs_dim: int, idcs: tuple):
fs = FeatureStack(identity_feat, MultFeat(idcs=idcs))
obs = to.randn(obs_dim)
feats_val = fs(obs)
assert isinstance(feats_val, to.Tensor)
assert len(feats_val) == obs_dim + 1
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"obs_dim, num_feat_per_dim", [(1, 1), (2, 1), (1, 4), (2, 4), (10, 100)], ids=["1_1", "2_1", "1_4", "2_4", "10_100"]
)
def test_rff_feat_serial(obs_dim: int, num_feat_per_dim: int):
rff = RFFeat(
inp_dim=obs_dim,
num_feat_per_dim=num_feat_per_dim,
bandwidth=np.ones(obs_dim),
)
fs = FeatureStack(rff)
for _ in range(10):
obs = to.randn(obs_dim)
feats_val = fs(obs)
assert isinstance(feats_val, to.Tensor)
assert feats_val.shape == (1, num_feat_per_dim)
[docs]@pytest.mark.features
@pytest.mark.parametrize("batch_size", [1, 2, 100], ids=["1", "2", "100"])
@pytest.mark.parametrize(
"obs_dim, num_feat_per_dim", [(1, 1), (2, 1), (1, 4), (2, 4), (10, 100)], ids=["1_1", "2_1", "1_4", "2_4", "10_100"]
)
def test_rff_feat_batched(batch_size: int, obs_dim: int, num_feat_per_dim: int):
rff = RFFeat(
inp_dim=obs_dim,
num_feat_per_dim=num_feat_per_dim,
bandwidth=np.ones(obs_dim),
)
fs = FeatureStack(rff)
for _ in range(10):
obs = to.randn(batch_size, obs_dim)
feats_val = fs(obs)
assert isinstance(feats_val, to.Tensor)
assert feats_val.shape == (batch_size, num_feat_per_dim)
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"obs_dim, num_feat_per_dim, bounds",
[
(1, 4, (to.tensor([-3.0]), to.tensor([3.0]))),
(1, 4, (np.array([-3.0]), np.array([3.0]))),
(2, 4, (to.tensor([-3.0, -4.0]), to.tensor([3.0, 4.0]))),
(10, 100, (to.tensor([-3.0] * 10), to.tensor([3.0] * 10))),
],
ids=["1_4_to", "1_4_np", "2_4", "10_100"],
)
def test_rbf_serial(obs_dim: int, num_feat_per_dim: int, bounds: to.Tensor):
rbf = RBFFeat(num_feat_per_dim=num_feat_per_dim, bounds=bounds)
fs = FeatureStack(rbf)
for _ in range(10):
obs = to.randn(obs_dim) # 1-dim obs vector
feats_val = fs(obs)
assert isinstance(feats_val, to.Tensor)
assert feats_val.shape == (1, obs_dim * num_feat_per_dim)
[docs]@pytest.mark.features
@pytest.mark.parametrize("batch_size", [1, 2, 100], ids=["1", "2", "100"])
@pytest.mark.parametrize(
"obs_dim, num_feat_per_dim, bounds",
[
(1, 4, (to.tensor([-3.0]), to.tensor([3.0]))),
(1, 4, (np.array([-3.0]), np.array([3.0]))),
(2, 4, (to.tensor([-3.0, -4.0]), to.tensor([3.0, 4.0]))),
(10, 100, (to.tensor([-3.0] * 10), to.tensor([3.0] * 10))),
],
ids=["1_4_to", "1_4_np", "2_4", "10_100"],
)
def test_rbf_feat_batched(batch_size: int, obs_dim: int, num_feat_per_dim: int, bounds: to.Tensor):
rbf = RBFFeat(num_feat_per_dim=num_feat_per_dim, bounds=bounds)
fs = FeatureStack(rbf)
for _ in range(10):
obs = to.randn(batch_size, obs_dim) # 2-dim obs array
feats_val = fs(obs)
assert isinstance(feats_val, to.Tensor)
assert feats_val.shape == (batch_size, obs_dim * num_feat_per_dim)
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"env",
[
"default_bob",
"default_qqsu",
"default_qbb",
pytest.param("default_bop5d_bt", marks=m_needs_bullet),
],
ids=["bob", "qq-st", "qbb", "bop5D"],
indirect=True,
)
@pytest.mark.parametrize("num_feat_per_dim", [4, 100], ids=["4", "100"])
def test_rff_policy_serial(env: Env, num_feat_per_dim: int):
rff = RFFeat(inp_dim=env.obs_space.flat_dim, num_feat_per_dim=num_feat_per_dim, bandwidth=env.obs_space.bound_up)
policy = LinearPolicy(env.spec, FeatureStack(rff))
for _ in range(10):
obs = env.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
act = policy(obs)
assert act.shape == (env.act_space.flat_dim,)
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"env",
[
"default_bob",
"default_qqsu",
"default_qbb",
pytest.param("default_bop5d_bt", marks=m_needs_bullet),
],
ids=["bob", "qq-su", "qbb", "bop5D"],
indirect=True,
)
@pytest.mark.parametrize(
"batch_size, num_feat_per_dim", [(1, 4), (20, 4), (1, 100), (20, 100)], ids=["1_4", "20_4", "1_100", "20_100"]
)
def test_rff_policy_batch(env: Env, batch_size: int, num_feat_per_dim: int):
rff = RFFeat(inp_dim=env.obs_space.flat_dim, num_feat_per_dim=num_feat_per_dim, bandwidth=env.obs_space.bound_up)
policy = LinearPolicy(env.spec, FeatureStack(rff))
for _ in range(10):
obs = env.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
obs = obs.repeat(batch_size, 1)
act = policy(obs)
assert act.shape == (batch_size, env.act_space.flat_dim)
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"env",
[
"default_bob",
"default_qqsu",
"default_qbb",
pytest.param("default_bop5d_bt", marks=m_needs_bullet),
],
ids=["bob", "qq-su", "qbb", "bop5D"],
indirect=True,
)
@pytest.mark.parametrize("num_feat_per_dim", [4, 100], ids=["4", "100"])
def test_rfb_policy_serial(env: Env, num_feat_per_dim: int):
rbf = RBFFeat(num_feat_per_dim=num_feat_per_dim, bounds=env.obs_space.bounds)
fs = FeatureStack(rbf)
policy = LinearPolicy(env.spec, fs)
for _ in range(10):
obs = env.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
act = policy(obs)
assert act.shape == (env.act_space.flat_dim,)
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"env",
[
"default_bob",
"default_qqsu",
"default_qbb",
pytest.param("default_bop5d_bt", marks=m_needs_bullet),
],
ids=["bob", "qq-su", "qbb", "bop5D"],
indirect=True,
)
@pytest.mark.parametrize(
"batch_size, num_feat_per_dim", [(1, 4), (20, 4), (1, 100), (20, 100)], ids=["1_4", "20_4", "1_100", "20_100"]
)
def test_rfb_policy_batch(env: Env, batch_size: int, num_feat_per_dim: int):
rbf = RBFFeat(num_feat_per_dim=num_feat_per_dim, bounds=env.obs_space.bounds)
fs = FeatureStack(rbf)
policy = LinearPolicy(env.spec, fs)
for _ in range(10):
obs = env.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
obs = obs.repeat(batch_size, 1)
act = policy(obs)
assert act.shape == (batch_size, env.act_space.flat_dim)
[docs]@pytest.mark.features
@pytest.mark.parametrize(
"env",
[
pytest.param("default_wambic", marks=m_needs_mujoco), # so far, the only use case
],
ids=["wambic"],
indirect=True,
)
@pytest.mark.parametrize("dim_mask", [0, 1, 2], ids=["0", "1", "2"])
def test_dualrbf_policy(env: Env, dim_mask: int):
# Hyper-parameters for the RBF features are not important here
rbf_hparam = dict(num_feat_per_dim=7, bounds=(np.array([0.0]), np.array([1.0])), scale=None)
policy = DualRBFLinearPolicy(env.spec, rbf_hparam, dim_mask)
assert policy.num_param == policy.num_active_feat * env.act_space.flat_dim // 2
ro = rollout(env, policy, eval=True)
assert isinstance(ro, StepSequence)
[docs]@pytest.mark.parametrize(
"env",
["default_qbb", "default_qcpsu", "default_qcpst", "default_qqsu", "default_qqst"],
ids=["qbb", "qcpsu", "qcpst", "qqsu", "qqst"],
indirect=True,
)
def test_env_specific(env: Env):
pyrado.set_seed(0)
if "qbb" in env.name:
policy = QBallBalancerPDCtrl(env.spec)
policy.reset()
elif "qcp" in env.name:
policy = QCartPoleSwingUpAndBalanceCtrl(env.spec)
policy.reset()
elif "qq" in env.name:
policy = QQubeSwingUpAndBalanceCtrl(env.spec)
policy.reset()
else:
raise NotImplementedError
# Sample an observation and do an action 10 times
for _ in range(10):
obs = env.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
act = policy(obs)
assert isinstance(act, to.Tensor)
[docs]@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
[
"linear_policy",
"fnn_policy",
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
"thfnn_policy",
"thgru_policy",
],
ids=["lin", "fnn", "rnn", "lstm", "gru", "adn", "nf", "thfnn", "thgru"],
indirect=True,
)
def test_parameterized_policies_init_param(env: Env, policy: Policy):
some_values = to.ones_like(policy.param_values)
policy.init_param(some_values)
to.testing.assert_allclose(policy.param_values, some_values)
[docs]@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
["idle_policy", "dummy_policy", "linear_policy", "fnn_policy"],
ids=["idle", "dummy", "lin", "fnn"],
indirect=True,
)
def test_feedforward_policy_one_step(env: Env, policy: Policy):
obs = env.spec.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
act = policy(obs)
assert isinstance(act, to.Tensor)
[docs]@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
[
"time_policy",
"traced_time_policy",
"pst_policy",
"traced_pst_policy",
],
ids=["time", "tracedtime", "pst", "tracedpst"],
indirect=True,
)
def test_time_policy_one_step(env: Env, policy: Policy):
policy.reset()
obs = env.obs_space.sample_uniform()
obs = to.from_numpy(obs)
act = policy(obs)
assert isinstance(act, to.Tensor)
@pytest.mark.recurrent_policy
@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
[
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
"thgru_policy",
],
ids=["rnn", "lstm", "gru", "adn", "nf", "thgru"],
indirect=True,
)
def test_recurrent_policy_one_step(env: Env, policy: Policy):
hid = policy.init_hidden()
obs = env.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
if isinstance(policy, TwoHeadedRNNPolicyBase):
act, out2, hid = policy(obs, hid)
assert isinstance(out2, to.Tensor)
else:
act, hid = policy(obs, hid)
assert isinstance(act, to.Tensor) and isinstance(hid, to.Tensor)
[docs]@pytest.mark.parametrize(
"env",
[
"default_bob",
"default_qbb",
pytest.param("default_bop5d_bt", marks=m_needs_bullet),
],
ids=["bob", "qbb", "bop5D"],
indirect=True,
)
@pytest.mark.parametrize(
"policy",
[
# dummy_policy and idle_policy are not supported
"linear_policy",
"fnn_policy",
],
ids=["lin", "fnn"],
indirect=True,
)
@pytest.mark.parametrize("batch_size", [1, 2, 3])
def test_feedforward_policy_batching(env: Env, policy: Policy, batch_size: int):
obs = np.stack([policy.env_spec.obs_space.sample_uniform() for _ in range(batch_size)]) # shape = (batch_size, 4)
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
act = policy(obs)
assert act.shape[0] == batch_size
[docs]@pytest.mark.recurrent_policy
@pytest.mark.parametrize(
"env",
[
"default_bob",
"default_qbb",
pytest.param("default_bop5d_bt", marks=m_needs_bullet),
],
ids=["bob", "qbb", "bop5D"],
indirect=True,
)
@pytest.mark.parametrize(
"policy",
[
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
"thrnn_policy",
"thgru_policy",
"thlstm_policy",
],
ids=["rnn", "lstm", "gru", "adn", "nf", "thgrnn", "thgru", "thlstm"],
indirect=True,
)
@pytest.mark.parametrize("batch_size", [1, 2, 4, 256])
def test_recurrent_policy_batching(env: Env, policy: Policy, batch_size: int):
assert policy.is_recurrent
obs = np.stack([policy.env_spec.obs_space.sample_uniform() for _ in range(batch_size)]) # shape = (batch_size, 4)
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
# Do this in evaluation mode to disable dropout&co
policy.eval()
# Create initial hidden state
hidden = policy.init_hidden(batch_size)
# Use a random one to ensure we don't just run into the 0-special-case
hidden.random_()
assert hidden.shape == (batch_size, policy.hidden_size)
if isinstance(policy, TwoHeadedRNNPolicyBase):
act, _, hid_new = policy(obs, hidden)
else:
act, hid_new = policy(obs, hidden)
assert hid_new.shape == (batch_size, policy.hidden_size)
if batch_size > 1:
# Try to use a subset of the batch
subset = to.arange(batch_size // 2)
if isinstance(policy, TwoHeadedRNNPolicyBase):
act_sub, _, hid_sub = policy(obs[subset, :], hidden[subset, :])
else:
act_sub, hid_sub = policy(obs[subset, :], hidden[subset, :])
to.testing.assert_allclose(act_sub, act[subset, :])
to.testing.assert_allclose(hid_sub, hid_new[subset, :])
[docs]@pytest.mark.recurrent_policy
@pytest.mark.parametrize(
"env",
["default_bob", "default_qbb", pytest.param("default_bop5d_bt", marks=m_needs_bullet)],
ids=["bob", "qbb", "bop5d"],
indirect=True,
)
@pytest.mark.parametrize(
"policy",
[
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
"thrnn_policy",
"thgru_policy",
"thlstm_policy",
],
ids=["rnn", "lstm", "gru", "adn", "nf", "thgrnn", "thgru", "thlstm"],
indirect=True,
)
def test_pytorch_recurrent_policy_rollout(env: Env, policy: Policy):
ro = rollout(env, policy, render_mode=RenderMode())
assert isinstance(ro, StepSequence)
[docs]@pytest.mark.recurrent_policy
@pytest.mark.parametrize(
"env",
["default_bob", "default_qbb", pytest.param("default_bop5d_bt", marks=m_needs_bullet)],
ids=["bob", "qbb", "bop5d"],
indirect=True,
)
@pytest.mark.parametrize(
"policy",
[
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
"thrnn_policy",
"thgru_policy",
"thlstm_policy",
],
ids=["rnn", "lstm", "gru", "adn", "nf", "thgrnn", "thgru", "thlstm"],
indirect=True,
)
def test_recurrent_policy_one_step(env: Env, policy: Policy):
assert policy.is_recurrent
obs = policy.env_spec.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.get_default_dtype())
# Do this in evaluation mode to disable dropout & co
policy.eval()
# Create initial hidden state
hidden = policy.init_hidden()
# Use a random one to ensure we don't just run into the 0-special-case
hidden = to.rand_like(hidden)
assert len(hidden) == policy.hidden_size
# Test general conformity
if isinstance(policy, TwoHeadedRNNPolicyBase):
act, otherhead, hid_new = policy(obs, hidden)
assert len(hid_new) == policy.hidden_size
else:
act, hid_new = policy(obs, hidden)
assert len(hid_new) == policy.hidden_size
# Test reproducibility
if isinstance(policy, TwoHeadedRNNPolicyBase):
act2, otherhead2, hid_new2 = policy(obs, hidden)
to.testing.assert_allclose(act, act2)
to.testing.assert_allclose(otherhead, otherhead2)
to.testing.assert_allclose(hid_new2, hid_new2)
else:
act2, hid_new2 = policy(obs, hidden)
to.testing.assert_allclose(act, act2)
to.testing.assert_allclose(hid_new2, hid_new2)
[docs]@pytest.mark.recurrent_policy
@pytest.mark.parametrize(
"env",
["default_pend", "default_qbb"],
ids=["pend", "qbb"],
indirect=True,
)
@pytest.mark.parametrize(
"policy",
["rnn_policy", "lstm_policy", "gru_policy"],
ids=["rnn", "lstm", "gru"],
indirect=True,
)
def test_basic_policy_evaluate_packed_padded_sequences(env: Env, policy: RecurrentPolicy):
# Test packed padded sequence implementation against old implementation
def old_evaluate(rollout: StepSequence, hidden_states_name: str = "hidden_states") -> to.Tensor:
# Set policy, i.e. PyTorch nn.Module, to evaluation mode
policy.eval()
# The passed sample collection might contain multiple rollouts.
act_list = []
for ro in rollout.iterate_rollouts():
if hidden_states_name in rollout.data_names:
# Get initial hidden state from first step
hidden = policy._unpack_hidden(ro[0][hidden_states_name])
else:
# Let the network pick the default hidden state
hidden = None
# Reshape observations to match PyTorch's RNN sequence protocol
obs = ro.get_data_values("observations", True).unsqueeze(1)
obs = obs.to(device=policy.device, dtype=to.get_default_dtype())
# Pass the input through hidden RNN layers
out, _ = policy.rnn_layers(obs, hidden)
# And through the output layer
act = policy.output_layer(out.squeeze(1))
if policy.output_nonlin is not None:
act = policy.output_nonlin(act)
# Collect the actions
act_list.append(act)
# Set policy, i.e. PyTorch nn.Module, back to training mode
policy.train()
return to.cat(act_list)
# Get some rollouts
ros = []
for i in range(5):
ro = rollout(env, policy, eval=True, render_mode=RenderMode())
ro.torch(to.get_default_dtype())
ros.append(ro)
# Perform concatenation
cat = StepSequence.concat(ros)
# Evaluate old and new approaches
act_old = old_evaluate(cat)
act_new = policy.evaluate(cat)
to.testing.assert_allclose(act_old, act_new)
[docs]@pytest.mark.recurrent_policy
@pytest.mark.parametrize(
"env",
["default_pend", "default_qbb"],
ids=["pend", "qbb"],
indirect=True,
)
@pytest.mark.parametrize(
"policy",
["thrnn_policy", "thgru_policy", "thlstm_policy"],
ids=["thrnn", "thgru", "thlstm"],
indirect=True,
)
def test_twoheaded_policy_evaluate_packed_padded_sequences(env: Env, policy: RecurrentPolicy):
# Test packed padded sequence implementation for custom recurrent neural networks
def old_evaluate(rollout: StepSequence, hidden_states_name: str = "hidden_states") -> to.Tensor:
# Set policy, i.e. PyTorch nn.Module, to evaluation mode
policy.eval()
act_list = []
head2_list = []
for ro in rollout.iterate_rollouts():
if hidden_states_name in rollout.data_names:
# Get initial hidden state from first step
hidden = ro[0][hidden_states_name]
else:
# Let the network pick the default hidden state
hidden = None
# Run steps consecutively reusing the hidden state
for step in ro:
act, head2, hidden = policy(step.observation, hidden)
act_list.append(act)
head2_list.append(head2)
# Set policy, i.e. PyTorch nn.Module, back to training mode
policy.train()
return to.stack(act_list), to.stack(head2_list)
# Get some rollouts
ros = []
for i in range(5):
ro = rollout(env, policy, eval=True, render_mode=RenderMode())
ro.torch(to.get_default_dtype())
ros.append(ro)
# Perform concatenation
cat = StepSequence.concat(ros)
# Evaluate old and new approaches
output_1_old, output_2_old = old_evaluate(cat)
output_1_new, output_2_new = policy.evaluate(cat)
to.testing.assert_allclose(output_1_old, output_1_new)
to.testing.assert_allclose(output_2_old, output_2_new)
[docs]@pytest.mark.recurrent_policy
@pytest.mark.parametrize(
"env",
["default_pend", "default_qbb"],
ids=["pend", "qbb"],
indirect=True,
)
@pytest.mark.parametrize(
"policy",
["adn_policy", "nf_policy"],
ids=["adn", "nf"],
indirect=True,
)
def test_potential_policy_evaluate_packed_padded_sequences(env: Env, policy: RecurrentPolicy):
# Test packed padded sequence implementation for custom recurrent neural networks
# Get some rollouts
ros = []
for i in range(5):
ro = rollout(env, policy, eval=True, render_mode=RenderMode())
ro.torch(to.get_default_dtype())
ros.append(ro)
# Perform concatenation
cat = StepSequence.concat(ros)
# Evaluate old and new approaches
act_new = policy.evaluate(cat)
assert act_new is not None
[docs]@pytest.mark.recurrent_policy
def test_hidden_state_packing_batch():
num_layers = 2
hidden_size = 2
batch_size = 2
unpacked = to.tensor([[[1.0, 2.0], [5.0, 6.0]], [[3.0, 4.0], [7.0, 8.0]]]) # l1, b1 # l1, b2 # l2, b1 # l2, b2
packed = to.tensor([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]])
# Test unpack
pu = default_unpack_hidden(packed, num_layers, hidden_size, batch_size)
to.testing.assert_allclose(pu, unpacked)
# Test pack
up = default_pack_hidden(unpacked, num_layers, hidden_size, batch_size)
to.testing.assert_allclose(up, packed)
[docs]@pytest.mark.recurrent_policy
def test_hidden_state_packing_nobatch():
num_layers = 2
hidden_size = 2
batch_size = None
unpacked = to.tensor([[[1.0, 2.0]], [[3.0, 4.0]]]) # l1 # l2
packed = to.tensor([1.0, 2.0, 3.0, 4.0])
# Test unpack
pu = default_unpack_hidden(packed, num_layers, hidden_size, batch_size)
to.testing.assert_allclose(pu, unpacked)
# Test pack
up = default_pack_hidden(unpacked, num_layers, hidden_size, batch_size)
to.testing.assert_allclose(up, packed)
[docs]@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
[
# TimePolicy and Two-headed policies are not supported
"linear_policy",
"fnn_policy",
],
ids=["lin", "fnn"],
indirect=True,
)
def test_script_nonrecurrent(env: Env, policy: Policy):
# Generate scripted version
scripted = policy.double().script()
# Compare results
sample = policy.env_spec.obs_space.sample_uniform()
obs = to.from_numpy(sample)
act_reg = policy(obs)
act_script = scripted(obs)
to.testing.assert_allclose(act_reg, act_script)
[docs]@pytest.mark.recurrent_policy
@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
[
# Two-headed policies are not supported
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
],
ids=["rnn", "lstm", "gru", "adn", "nf"],
indirect=True,
)
def test_script_recurrent(env: Env, policy: Policy):
# Generate scripted version
scripted = policy.double().script()
# Compare results, tracing hidden manually
hidden = policy.init_hidden()
# Run one step
sample = policy.env_spec.obs_space.sample_uniform()
obs = to.from_numpy(sample)
act_reg, hidden = policy(obs, hidden)
act_script = scripted(obs)
to.testing.assert_allclose(act_reg, act_script)
# Run second step
sample = policy.env_spec.obs_space.sample_uniform()
obs = to.from_numpy(sample)
act_reg, hidden = policy(obs, hidden)
act_script = scripted(obs)
to.testing.assert_allclose(act_reg, act_script)
# Test after reset
hidden = policy.init_hidden()
scripted.reset()
sample = policy.env_spec.obs_space.sample_uniform()
obs = to.from_numpy(sample)
act_reg, hidden = policy(obs, hidden)
act_script = scripted(obs)
to.testing.assert_allclose(act_reg, act_script)
[docs]@to.no_grad()
@m_needs_libtorch
@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
[
# TimePolicy and Two-headed policies are not supported
"linear_policy",
"fnn_policy",
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
],
ids=["lin", "fnn", "rnn", "lstm", "gru", "adn", "nf"],
indirect=True,
)
@pytest.mark.parametrize("file_type", [".pt", ".zip"], ids=["pt", "zip"])
def test_export_cpp(env: Env, policy: Policy, tmpdir: str, file_type):
# Generate scripted version (in double mode for CPP compatibility)
scripted = policy.double().script()
# Export
export_file = osp.join(tmpdir, "policy" + file_type)
scripted.save(export_file)
# Import again
loaded = to.jit.load(export_file)
# Compare a couple of inputs
for i in range(50):
obs = policy.env_spec.obs_space.sample_uniform()
obs_to = to.from_numpy(obs) # is already double
act_scripted = scripted(obs_to).cpu().numpy()
act_loaded = loaded(to.from_numpy(obs)).cpu().numpy()
assert act_loaded == pytest.approx(act_scripted), f"Wrong action values on step #{i}"
# Test after reset
if hasattr(scripted, "reset"):
scripted.reset()
loaded.reset()
assert loaded.hidden.numpy() == pytest.approx(scripted.hidden.numpy()), "Wrong hidden state after reset"
obs = policy.env_spec.obs_space.sample_uniform()
obs_to = to.from_numpy(obs) # is already double
act_scripted = scripted(obs_to).cpu().numpy()
act_loaded = loaded(to.from_numpy(obs)).cpu().numpy()
assert act_loaded == pytest.approx(act_scripted), "Wrong action values after reset"
[docs]@to.no_grad()
@m_needs_rcs
@m_needs_libtorch
@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize(
"policy",
[
# TimePolicy and Two-headed policies are not supported
"linear_policy",
"fnn_policy",
"rnn_policy",
"lstm_policy",
"gru_policy",
"adn_policy",
"nf_policy",
],
ids=["lin", "fnn", "rnn", "lstm", "gru", "adn", "nf"],
indirect=True,
)
def test_export_rcspysim(env: Env, policy: Policy, tmpdir: str):
from rcsenv import ControlPolicy
# Generate scripted version (double mode for CPP compatibility)
scripted = policy.double().script()
print(scripted.graph)
# Export
export_file = osp.join(tmpdir, "policy.pt")
to.jit.save(scripted, export_file)
# Import in C
cpp = ControlPolicy("torch", export_file)
# Compare a couple of inputs
for _ in range(50):
obs = policy.env_spec.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.double)
act_script = scripted(obs).cpu().numpy()
act_cpp = cpp(obs, policy.env_spec.act_space.flat_dim)
assert act_cpp == pytest.approx(act_script)
# Test after reset
if hasattr(scripted, "reset"):
scripted.reset()
cpp.reset()
obs = policy.env_spec.obs_space.sample_uniform()
obs = to.from_numpy(obs).to(dtype=to.double)
act_script = scripted(obs).cpu().numpy()
act_cpp = cpp(obs, policy.env_spec.act_space.flat_dim)
assert act_cpp == pytest.approx(act_script)
[docs]@pytest.mark.parametrize("in_features", [1, 3], ids=["1dim", "3dim"])
@pytest.mark.parametrize("same_nonlin", [True, False], ids=["same_nonlin", "different_nonlin"])
@pytest.mark.parametrize("bias", [True, False], ids=["bias", "no_bias"])
@pytest.mark.parametrize("weight", [True, False], ids=["weight", "no_weight"])
def test_indi_nonlin_layer(in_features, same_nonlin, bias, weight):
if not same_nonlin and in_features > 1:
nonlin = in_features * [to.tanh]
else:
nonlin = to.sigmoid
layer = IndiNonlinLayer(in_features, nonlin, bias, weight)
assert isinstance(layer, nn.Module)
i = to.randn(in_features)
o = layer(i)
assert isinstance(o, to.Tensor)
assert i.shape == o.shape
[docs]@to.no_grad()
@pytest.mark.parametrize("env", ["default_bob", "default_qbb"], ids=["bob", "qbb"], indirect=True)
@pytest.mark.parametrize("dtype", ["torch", "numpy"], ids=["torch", "numpy"])
def test_playback_policy(env: Env, dtype):
# Create 2 recordings of different length
if dtype == "torch":
actions = [to.randn(10, env.spec.act_space.flat_dim), to.randn(7, env.spec.act_space.flat_dim)]
else:
actions = [np.random.randn(10, env.spec.act_space.flat_dim), np.random.randn(7, env.spec.act_space.flat_dim)]
policy = PlaybackPolicy(env.spec, act_recordings=actions)
if dtype == "torch":
actions = [a.numpy() for a in actions]
# Sample one rollout and check the actions
ro = rollout(env, policy)
assert policy.curr_rec == 0
assert np.allclose(ro.actions[:10, :], actions[0])
assert np.allclose(ro.actions[10:, :], np.zeros(env.spec.act_space.flat_dim))
# Sample another rollout and check the actions
ro2 = rollout(env, policy)
assert policy.curr_rec == 1
assert np.allclose(ro2.actions[:7, :], actions[1])
assert np.allclose(ro2.actions[7:, :], np.zeros(env.spec.act_space.flat_dim))
# Check the properties
policy.curr_step = 3
assert policy.curr_step == 3
policy.curr_rec = 0
assert policy.curr_rec == 0
policy.reset_curr_rec()
assert policy.curr_rec == -1
[docs]@pytest.mark.parametrize("env", ["default_pend", "default_qbb"], ids=["pend", "qbb"], indirect=True)
@pytest.mark.parametrize("cond_lvl", ["vel", "acc"], ids=["vel", "acc"])
@pytest.mark.parametrize("cond_final", ["zero", "one"], ids=["zero", "one"])
@pytest.mark.parametrize("cond_init", [None, "rand"], ids=["default", "rand"])
@pytest.mark.parametrize("overtime_behavior", ["hold", "zero"], ids=["hold", "zero"])
@pytest.mark.parametrize("use_cuda", [False, pytest.param(True, marks=m_needs_cuda)], ids=["cpu", "cuda"])
def test_poly_time_policy(env: Env, cond_lvl: str, cond_final: str, cond_init, overtime_behavior: str, use_cuda: bool):
order = 3 if cond_lvl == "vel" else 5
num_cond = (order + 1) // 2
if cond_final == "zero":
cond_final = to.zeros(num_cond, env.act_space.flat_dim)
elif cond_final == "one":
cond_final = to.zeros(num_cond, env.act_space.flat_dim)
cond_final[::num_cond] = 1.0
if cond_init == "rand":
cond_init = to.randn(num_cond, env.act_space.flat_dim)
# Create instance
policy = PolySplineTimePolicy(
spec=env.spec,
dt=env.dt,
t_end=int(env.max_steps * env.dt),
cond_lvl=cond_lvl,
cond_final=cond_final,
cond_init=cond_init,
overtime_behavior=overtime_behavior,
use_cuda=use_cuda,
)
policy.reset()
act_hist = []
for _ in range(env.max_steps):
act = policy(None)
act_hist.append(act.detach().cpu())
if cond_final == "zero":
assert act == pytest.approx(to.zeros_like(act))
if cond_final == "one":
assert to.allclose(act_hist[-1], to.ones_like(act))
# Check overtime behavior
policy.reset()
act_hist_ot = []
for _ in range(2 * env.max_steps):
act = policy(env.obs_space.sample_uniform())
act_hist_ot.append(act.detach().cpu())
if overtime_behavior == "hold":
assert to.allclose(act_hist_ot[-1], act_hist_ot[-1])
elif overtime_behavior == "zero":
assert to.allclose(act_hist_ot[-1], to.zeros_like(act))