Source code for tests.test_utils

# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
#    or Technical University of Darmstadt, nor the names of its contributors may
#    be used to endorse or promote products derived from this software without
#    specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import copy
import functools
import math
import os.path as osp
from math import ceil
from typing import Union

import numpy as np
import pytest
import torch as to
import torch.nn as nn
from tqdm import tqdm

import pyrado
from pyrado.environments.pysim.ball_on_beam import BallOnBeamSim
from pyrado.logger.iteration import IterationTracker
from pyrado.policies.feed_forward.dummy import DummyPolicy
from pyrado.sampling.rollout import rollout
from pyrado.sampling.step_sequence import StepSequence
from pyrado.sampling.utils import gen_ordered_batch_idcs, gen_ordered_batches, gen_shuffled_batch_idcs
from pyrado.spaces import BoxSpace
from pyrado.utils.averaging import RunningExpDecayingAverage, RunningMemoryAverage
from pyrado.utils.bijective_transformation import IdentityTransformation, LogTransformation, SqrtTransformation
from pyrado.utils.checks import (
    check_all_equal,
    check_all_lengths_equal,
    check_all_shapes_equal,
    check_all_types_equal,
    is_iterator,
    is_sequence,
)
from pyrado.utils.data_processing import (
    MinMaxScaler,
    RunningNormalizer,
    RunningStandardizer,
    Standardizer,
    correct_atleast_2d,
    normalize,
    scale_min_max,
)
from pyrado.utils.data_types import (
    DSSpec,
    LinDSSpec,
    MSDDSSpec,
    merge_dicts,
    repeat_interleave,
    update_matching_keys_recursively,
)
from pyrado.utils.functions import noisy_nonlin_fcn, skyline
from pyrado.utils.input_output import completion_context, print_cbt_once
from pyrado.utils.math import cosine_similarity, cov, logmeanexp, numerical_differentiation_coeffs, rmse
from pyrado.utils.optimizers import GSS


[docs]@pytest.mark.parametrize( "x, data_along_rows", [(np.random.rand(100, 4), True), (np.random.rand(4, 100), False)], ids=["100_4", "4_100"] ) def test_cov(x, data_along_rows): rowvar = not data_along_rows cov_np = np.cov(x, rowvar=rowvar) cov_pyrado = cov(to.from_numpy(x), data_along_rows=data_along_rows).numpy() assert cov_pyrado.shape[0] == cov_pyrado.shape[1] if data_along_rows: assert cov_np.shape[0] == x.shape[1] assert cov_pyrado.shape[0] == x.shape[1] else: assert cov_np.shape[0] == x.shape[0] assert cov_pyrado.shape[0] == x.shape[0] assert np.allclose(cov_np, cov_pyrado)
[docs]@pytest.mark.parametrize( "env, expl_strat", [ (BallOnBeamSim(dt=0.02, max_steps=100), DummyPolicy(BallOnBeamSim(dt=0.02, max_steps=100).spec)), ], ids=["bob_dummy"], ) def test_concat_rollouts(env, expl_strat): ro1 = rollout(env, expl_strat) ro2 = rollout(env, expl_strat) ro_cat = StepSequence.concat([ro1, ro2]) assert isinstance(ro_cat, StepSequence) assert ro_cat.length == ro1.length + ro2.length
[docs]@pytest.mark.parametrize( "x, y", [ (to.tensor([1.0, 2.0, 3.0]), to.tensor([1.0, 2.0, 3.0])), (to.tensor([1.0, 0.0, 1.0]), to.tensor([1.0, 1000.0, 1.0])), (to.tensor([0.0, 0.0, 0.0]), to.tensor([1.0, 2, 3.0])), (to.tensor([1.0, 2.0, 3.0]), to.tensor([2.0, 4.0, 6.0])), (to.tensor([1.0, 2.0, 3.0]), to.tensor([-1.0, -2.0, -3.0])), ], ids=["same", "similarity_1", "similarity_0", "colinear_scaled", "colinear_opposite"], ) def test_cosine_similarity(x, y): # Only tested for vector inputs d_cos = cosine_similarity(x, y) assert isinstance(d_cos, to.Tensor) # The examples are chosen to result in 0, 1, or -1 assert to.isclose(d_cos, to.tensor(0.0)) or to.isclose(d_cos, to.tensor(1.0)) or to.isclose(d_cos, to.tensor(-1.0))
[docs]@pytest.mark.parametrize("type", ["numpy", "torch"], ids=["numpy", "torch"]) @pytest.mark.parametrize("dim", [0, 1], ids=["dim0", "dim1"]) def test_rmse(type, dim): shape = (42, 21) if type == "numpy": x = np.random.randn(*shape) y = np.random.randn(*shape) else: x = to.randn(*shape) y = to.randn(*shape) e = rmse(x, y) if type == "numpy": assert isinstance(e, np.ndarray) else: assert isinstance(e, to.Tensor)
[docs]@pytest.mark.parametrize( "x, y", [ ({"a": 1, "b": 2}, {"c": 1, "d": 4}), ({"a": 1, "b": 2}, {"b": 3, "d": 4}), ], ids=["disjoint", "overlapping"], ) def test_merge_lod_var_dtype(x, y): z = merge_dicts([x, y]) assert z["a"] == 1 if z["b"] == 2: # disjoint assert z["c"] == 1 elif z["b"] == 3: # overlapping assert len(z) == 3 else: assert False assert z["d"] == 4
[docs]@pytest.mark.parametrize( "seq", [ [1, 2, 3], (1, 2, 3), ], ids=["list", "tuple"], ) @pytest.mark.parametrize("num_reps", [3]) def test_repeat_interleave(seq, num_reps): seq_rep = repeat_interleave(seq, num_reps) assert len(seq_rep) == num_reps * len(seq) assert all([seq_rep[i] == seq[0] for i in range(num_reps)]) # only check first ele assert type(seq_rep) == type(seq)
[docs]@pytest.mark.parametrize( "batch_size, data_size", [(3, 30), (3, 29), (3, 28), (2, 2)], ids=["division_mod0", "division_mod1", "division_mod2", "edge_case"], ) @pytest.mark.parametrize("sorted", [True, False], ids=["sorted", "unsorted"]) def test_gen_batch_idcs(batch_size, data_size, sorted): generator = gen_shuffled_batch_idcs(batch_size, data_size) unordered_batches = list(generator) assert len(unordered_batches) == ceil(data_size / batch_size) assert all(len(uob) <= batch_size for uob in unordered_batches) generator = gen_ordered_batch_idcs(batch_size, data_size, sorted) ordered_batches = list(generator) assert len(ordered_batches) == ceil(data_size / batch_size) assert all(len(ob) <= batch_size for ob in ordered_batches) # Check if each mini-batch is sorted assert all(all(ob[i] <= ob[i + 1] for i in range(len(ob) - 1)) for ob in ordered_batches)
[docs]@pytest.mark.parametrize( "data, batch_size", [ (list(range(9)), 3), (list(range(10)), 3), ], ids=["division_mod0", "division_mod1"], ) def test_gen_ordered_batches(data, batch_size): n = ceil(len(data) / batch_size) for i, batch in enumerate(gen_ordered_batches(data, batch_size)): if i < n - 1: assert len(batch) == batch_size if i == n - 1: assert len(batch) <= batch_size
[docs]@pytest.mark.parametrize("dtype", ["torch", "numpy"], ids=["to", "np"]) @pytest.mark.parametrize("axis", [0, 1], ids=["ax_0", "ax_1"]) def test_normalize(dtype, axis): for _ in range(10): x = to.rand(5, 3) if dtype == "torch" else np.random.rand(5, 3) x_norm = normalize(x, axis=axis, order=1) if isinstance(x_norm, to.Tensor): x_norm = x_norm.numpy() # for easier checking with pytest.approx assert np.sum(x_norm, axis=axis) == pytest.approx(1.0)
[docs]@pytest.mark.parametrize("dtype", ["torch", "numpy", "mixed"], ids=["to", "np", "mixed"]) @pytest.mark.parametrize( "lb, ub", [(0, 1), (-1, 1), (-2.5, 0), (-np.ones((3, 2)), np.ones((3, 2)))], ids=["lb0_ub1", "lb-1_ub1", "lb-2.5_ub0", "np_ones"], ) def test_scale_min_max(dtype, lb, ub): for _ in range(10): if dtype == "torch": bound_lo = to.tensor([lb], dtype=to.float64) bound_up = to.tensor([ub], dtype=to.float64) elif dtype == "numpy": bound_lo = np.array(lb, dtype=np.float64) bound_up = np.array(ub, dtype=np.float64) else: bound_lo = lb bound_up = ub x = 1e2 * to.rand(3, 2) if dtype == "torch" else np.random.rand(3, 2) x_scaled = scale_min_max(x, bound_lo, bound_up) if isinstance(x_scaled, to.Tensor) and isinstance(bound_lo, to.Tensor) and isinstance(bound_up, to.Tensor): x_scaled = x_scaled.numpy() # for easier checking with pytest.approx bound_lo = bound_lo.numpy() bound_up = bound_up.numpy() assert np.all(bound_lo * np.ones_like(x_scaled) <= x_scaled) assert np.all(x_scaled <= bound_up * np.ones_like(x_scaled))
@pytest.mark.parametrize("dtype", ["torch", "numpy"], ids=["to", "np"]) @pytest.mark.parametrize( "lb, ub", [(0, 1), (-1, 1), (-2.5, 0), (-np.ones((3, 2)), np.ones((3, 2)))], ids=["lb0_ub1", "lb-1_ub1", "lb-2.5_ub0", "np_ones"], ) def test_minmaxscaler(dtype, lb, ub): for _ in range(10): scaler = MinMaxScaler(lb, ub) x = 1e2 * to.rand(3, 2) if dtype == "torch" else np.random.rand(3, 2) x_scaled = scaler.scale_to(x) x_scaled_back = scaler.scale_back(x_scaled) if isinstance(x_scaled, to.Tensor): x_scaled = x_scaled.numpy() # for easier checking with pytest.approx x_scaled_back = x_scaled_back.numpy() # for easier checking with pytest.approx assert np.all(scaler._bound_lo * np.ones_like(x_scaled) <= x_scaled) assert np.all(x_scaled <= scaler._bound_up * np.ones_like(x_scaled)) assert np.allclose(x, x_scaled_back)
[docs]@pytest.mark.parametrize( "data_seq, axis", [ ([np.array([1, 1, 2]), np.array([1, 6, 3]), np.array([1, 6, 3]), np.array([10, -20, 20])], 0), ([np.array([1, 1, 2]), np.array([1, 6, 3]), np.array([1, 6, 3]), np.array([10, -20, 20])], None), ([np.array([1, 1, 2, 2]), np.array([1, 6, 3]), np.array([1, 6, 3]), np.array([10, 10, -20, 20])], 0), ([np.array([1, 1, 2, 2]), np.array([1, 6, 3]), np.array([1, 6, 3]), np.array([10, 10, -20, 20])], None), ( [ to.tensor([1.0, 1.0, 2]), to.tensor([1.0, 6.0, 3.0]), to.tensor([1.0, 6.0, 3.0]), to.tensor([10.0, -20.0, 20.0]), ], 0, ), ( [ to.tensor([1.0, 1.0, 2]), to.tensor([1.0, 6.0, 3.0]), to.tensor([1.0, 6.0, 3.0]), to.tensor([10.0, -20.0, 20.0]), ], -1, ), ( [to.tensor([1.0, 1, 2, 2]), to.tensor([1.0, 6, 3]), to.tensor([1.0, 6, 3]), to.tensor([10.0, 10, -20, 20])], 0, ), ( [to.tensor([1.0, 1, 2, 2]), to.tensor([1.0, 6, 3]), to.tensor([1.0, 6, 3]), to.tensor([10.0, 10, -20, 20])], -1, ), ], ids=[ "np_same_length_0", "np_same_length_None", "np_mixed_length_0", "np_mixed_length_None", "to_same_length_0", "to_same_length_-1", "to_mixed_length_0", "to_mixed_length_-1", ], ) def test_running_standardizer(data_seq, axis): rs = RunningStandardizer() for data in data_seq: z = rs(data, axis) assert z is not None rs.reset() assert rs.mean is None and rs.sum_sq_diffs is None and rs.iter == 0
[docs]@pytest.mark.parametrize( "data_seq, alpha", [ ([np.array([1, 1, 2]), np.array([1, 6, 3]), np.array([1, 6, 3]), np.array([10, -20, 20])], 0.9), ( [ to.tensor([1.0, 1.0, 2]), to.tensor([1.0, 6.0, 3.0]), to.tensor([1.0, 6.0, 3.0]), to.tensor([10.0, -20.0, 20.0]), ], 0.1, ), ], ids=["np", "to"], ) def test_running_expdecay_average(data_seq, alpha): reda = RunningExpDecayingAverage(alpha) for data in data_seq: z = reda(data) assert z is not None reda.reset(alpha=0.5) assert reda._alpha == 0.5 and reda._prev_est is None
[docs]@pytest.mark.parametrize( "data_seq, capacity", [ ([np.array([1.0, 1, 2]), np.array([1.0, 1, 2]), np.array([1.0, 1, 2]), np.array([-2.0, -2, -4])], 3), ([to.tensor([1.0, 1, 2]), to.tensor([1.0, 1, 2]), to.tensor([1.0, 1, 2]), to.tensor([-2.0, -2, -4])], 3), ], ids=["np", "to"], ) def test_running_mem_average(data_seq, capacity): rma = RunningMemoryAverage(capacity) for i, data in enumerate(data_seq): z = rma(data) if isinstance(z, np.ndarray): z = z.astype(dtype=np.float32) if i <= 2: assert z == pytest.approx([1.0, 1, 2]) elif i == 3: assert z == pytest.approx([0.0, 0, 0]) rma.reset(capacity=5) assert rma.capacity == 5 and rma.memory is None
[docs]@pytest.mark.parametrize( "data_seq", [ [5 * np.random.rand(25, 3), 0.1 * np.random.rand(5, 3), 20 * np.random.rand(70, 3)], [5 * to.rand(25, 3), 0.1 * to.rand(5, 3), 20 * to.rand(70, 3)], ], ids=["np", "to"], ) def test_running_normalizer(data_seq): rn = RunningNormalizer() for data in data_seq: data_norm = rn(data) assert (-1 <= data_norm).all() assert (data_norm <= 1).all()
[docs]@pytest.mark.parametrize("data_type", ["numpy", "torch"], ids=["numpy", "torch"]) @pytest.mark.parametrize("shape", [(1000, 1), (1, 1000), (1000, 1000)], ids=["1x1000", "1000x1", "1000x1000"]) def test_stateful_standardizer(data_type: str, shape: tuple): pyrado.set_seed(0) if data_type == "numpy": x = 100 * np.random.rand(*shape) elif data_type == "torch": x = 100 * to.rand(shape) ss = Standardizer() if isinstance(x, to.Tensor): x_stdized = ss.standardize(x) assert x_stdized.shape == x.shape assert to.allclose(x_stdized.mean(), to.zeros(1), atol=1e-6) assert to.allclose(x_stdized.std(), to.ones(1), atol=1e-6) x_restrd = ss.unstandardize(x_stdized) assert x_restrd.shape == x.shape assert to.allclose(x_restrd, x, rtol=1e-02, atol=1e-05) elif isinstance(x, np.ndarray): x_stdized = ss.standardize(x) assert x_stdized.shape == x.shape assert np.allclose(x_stdized.mean(), np.zeros(1)) assert np.allclose(x_stdized.std(), np.ones(1)) x_restrd = ss.unstandardize(x_stdized) assert x_restrd.shape == x.shape assert np.allclose(x_restrd, x, rtol=1e-02, atol=1e-05)
[docs]@pytest.mark.parametrize("g, ed", [(1.0, 2.0), (np.array([-1.0, 2.0]), np.eye(2))], ids=["scalar", "array"]) def test_ds_spec(g, ed): # Base class dss = DSSpec(function="name", goal=g) assert isinstance(dss, dict) assert dss["function"] == "name" if isinstance(g, np.ndarray): assert np.all(dss["goal"] == g) else: assert dss["goal"] == g # Linear first order subclass lds = LinDSSpec(function="lin", goal=g, errorDynamics=ed) assert isinstance(dss, dict) assert lds["function"] == "lin" if isinstance(g, np.ndarray): assert np.all(lds["goal"] == g) assert np.all(lds["errorDynamics"] == ed) else: assert lds["goal"] == g assert lds["errorDynamics"] == ed # Mass-Spring-Damper subclass msds = MSDDSSpec(function="msd", goal=g, damping=2.0, attractorStiffness=3.0, mass=4.0) assert isinstance(dss, dict) assert msds["function"] == "msd" if isinstance(g, np.ndarray): assert np.all(msds["goal"] == g) else: assert msds["goal"] == g assert msds["damping"] == 2.0 assert msds["attractorStiffness"] == 3.0 assert msds["mass"] == 4.0
[docs]@pytest.mark.parametrize("identical_bounds", [True, False], ids=["identical", "separate"]) def test_gss_optimizer_identical_bounds(identical_bounds): class Dummy: def loss_fcn(self): # Some function to minimize return (self.x + self.y + 4) ** 2 def __init__(self): # Test with different lower and upper bounds self.x, self.y = to.tensor([0.0]), to.tensor([4.0]) x_min, x_max = to.tensor([-10.0]), to.tensor([5.0]) if identical_bounds: self.optim = GSS([{"params": self.x}, {"params": self.y}], x_min, x_max) else: x_min_override = to.tensor([-6.0]) self.optim = GSS([{"params": self.x, "param_min": x_min_override}, {"params": self.y}], x_min, x_max) dummy = Dummy() for i in range(2): dummy.optim.step(dummy.loss_fcn) assert dummy.x != dummy.y print(f"x = {dummy.x.item()} \t y = {dummy.y.item()}")
[docs]def test_gss_optimizer_functional(): class Dummy: def loss_fcn(self): # Some function to minimize return (self.x + 4) ** 2 def __init__(self): # Test with different lower and upper bounds self.x = to.tensor([0.0]) x_min, x_max = to.tensor([-10.0]), to.tensor([10.0]) self.optim = GSS([{"params": self.x}], x_min, x_max) dummy = Dummy() for i in range(100): dummy.optim.step(dummy.loss_fcn) assert to.norm(dummy.x + 4) < 1e-4
[docs]@pytest.mark.visual def test_gss_optimizer_nlin_fcn(): from matplotlib import pyplot as plt # Parameters x_grid = to.linspace(-2.0, 3.0, 200) f = 1.0 noise_std = 0.1 # Init param and optimizer x_init = ( to.rand(1) * (x_grid.max() - x_grid.min()) / 2 + x_grid.min() + (x_grid.max() - x_grid.min()) / 4 ) # [.25, .75] x = nn.Parameter(to.tensor([x_init]), requires_grad=False) optim = GSS([x], param_min=x_grid.min().unsqueeze(0), param_max=x_grid.max().unsqueeze(0)) obj_fcn = functools.partial(noisy_nonlin_fcn, x=x, f=f, noise_std=noise_std) num_epochs = 10 # Init plotting plt.figure() plt.plot(x_grid, noisy_nonlin_fcn(x=x_grid, f=f), label="noise free fcn") plt.scatter(x.data.numpy(), obj_fcn().numpy(), s=40, marker="x", color="k", label="init guess") colors = plt.get_cmap("inferno")(np.linspace(0, 1, num_epochs)) for e in tqdm(range(num_epochs), total=num_epochs): # Evaluate at a the current point optim.step(obj_fcn) # Plot current evaluation plt.plot(x_grid, noisy_nonlin_fcn(x=x_grid, f=f, noise_std=noise_std), alpha=0.2) plt.scatter(x.data.numpy(), obj_fcn().numpy(), s=16, color=colors[e]) plt.xlabel("$x$") plt.ylabel("$f(x)$") plt.legend() # plt.show() plt.close() assert noisy_nonlin_fcn(x, f=f, noise_std=noise_std) < noisy_nonlin_fcn(x_init, f=f, noise_std=noise_std)
[docs]@pytest.mark.parametrize("dt", [0.1], ids=["0.1"]) @pytest.mark.parametrize("t_end", [6.0], ids=["1."]) @pytest.mark.parametrize( "t_intvl_space", [ BoxSpace(0.1, 0.11, shape=1), BoxSpace(0.123, 0.456, shape=1), BoxSpace(10.0, 20.0, shape=1), ], ids=["small_time_intvl", "real_time_intvl", "large_time_intvl"], ) @pytest.mark.parametrize("val_space", [BoxSpace(-5.0, 3.0, shape=1)], ids=["-5_to_3"]) def test_skyline(dt: Union[int, float], t_end: Union[int, float], t_intvl_space: BoxSpace, val_space: BoxSpace): # Create the skyline function t, vals = skyline(dt, t_end, t_intvl_space, val_space) assert isinstance(t, np.ndarray) and isinstance(vals, np.ndarray) assert len(t) == len(vals)
[docs]def test_check_prompt(): with completion_context("Works fine", color="g"): a = 3 with pytest.raises(ZeroDivisionError): with completion_context("Works fine", color="r", bright=True): a = 3 / 0
[docs]@pytest.mark.parametrize("color", ["w", "g", "y", "c", "r", "b"], ids=["w", "g", "y", "c", "r", "b"]) @pytest.mark.parametrize("bright", [True, False], ids=["bright", "not_bright"]) def test_print_cbt_once(color, bright): # Reset the flag for this test print_cbt_once.has_run = False msg = "You should only read this once per color and brightness" for i in range(10): print_cbt_once(msg, color, bright, tag="tag", end="\n") if i > 0: assert print_cbt_once.has_run
[docs]@pytest.mark.parametrize("x", [to.rand(5, 3), np.random.rand(5, 3)], ids=["torch", "numpy"]) @pytest.mark.parametrize("dim", [0, 1], ids=["dim0", "dim1"]) def test_logmeanexp(x, dim): lme = logmeanexp(x, dim) assert lme is not None if isinstance(x, to.Tensor) and isinstance(lme, to.Tensor): assert to.allclose(lme, to.log(to.mean(to.exp(x), dim=dim))) if isinstance(x, np.ndarray): assert np.allclose(lme, np.log(np.mean(np.exp(x), axis=dim)))
[docs]@pytest.mark.parametrize( "obj, file_ext", [ (to.rand((5, 3)), "pt"), (np.random.rand(5, 3), "npy"), (DummyPolicy(BallOnBeamSim(dt=0.01, max_steps=500).spec), "pt"), (BallOnBeamSim(dt=0.01, max_steps=500), "pkl"), ], ids=["tensor", "ndarray", "dummypol", "pyenv"], ) @pytest.mark.parametrize("prefix", ["", "pre"], ids=["defualt", "pre"]) @pytest.mark.parametrize("suffix", ["", "suf"], ids=["defualt", "suf"]) @pytest.mark.parametrize("use_state_dict", [True, False], ids=["use_state_dict", "not-use_state_dict"]) @pytest.mark.parametrize("verbose", [True, False], ids=["v", "q"]) def test_save_load(obj, file_ext: str, tmpdir, prefix: str, suffix: str, use_state_dict: bool, verbose: bool): # Save pyrado.save(obj, f"tmpname.{file_ext}", tmpdir, prefix, suffix, use_state_dict) # Check if sth has been saved with the correct pre- and suffix if prefix == "" and suffix == "": assert osp.exists(osp.join(tmpdir, f"tmpname.{file_ext}")) elif prefix != "" and suffix != "": assert osp.exists(osp.join(tmpdir, f"{prefix}_tmpname_{suffix}.{file_ext}")) elif prefix != "" and suffix == "": assert osp.exists(osp.join(tmpdir, f"{prefix}_tmpname.{file_ext}")) elif prefix == "" and suffix != "": assert osp.exists(osp.join(tmpdir, f"tmpname_{suffix}.{file_ext}")) # Check if sth has been loaded with the correct pre- and suffix res = pyrado.load(f"tmpname.{file_ext}", tmpdir, prefix, suffix, obj, verbose) assert res is not None
[docs]@pytest.mark.parametrize( "s", [[-1, 0, 1], np.array([[-1, 0, 1]]), np.array([-4, -2, -1, 0])], ) @pytest.mark.parametrize( "d", [1, 2], ) @pytest.mark.parametrize( "h", [1, 1e-4], ) def test_diff_coeffs(s, d, h): coeffs, order = numerical_differentiation_coeffs(s, d, h) assert sum(coeffs) == 0 assert order > 0
[docs]@pytest.mark.parametrize( "x, b", [ [iter((1, 2, 3)), True], [iter([1, 2, 3]), True], [iter(dict(a=1, b=2)), True], [(1, 2, 3), False], [[1, 2, 3], False], ], ) def test_is_iterator(x, b): assert is_iterator(x) == b
[docs]@pytest.mark.parametrize( "x, b", [ [(1, 2, 3), True], [[1, 2, 3], True], [dict(a=1, b=2), True], [1, False], ], ) def test_is_sequence(x, b): assert is_sequence(x) == b
[docs]@pytest.mark.parametrize("dtype", ["torch", "numpy"], ids=["to", "np"]) @pytest.mark.parametrize( "lb, ub", [(0, 1), (-1, 1), (-2.5, 0), (-np.ones((3, 2)), np.ones((3, 2)))], ids=["lb0_ub1", "lb-1_ub1", "lb-2.5_ub0", "np_ones"], ) def test_minmaxscaler(dtype, lb, ub): for _ in range(10): scaler = MinMaxScaler(lb, ub) x = 1e2 * to.rand(3, 2) if dtype == "torch" else np.random.rand(3, 2) x_scaled = scaler.scale_to(x) x_scaled_back = scaler.scale_back(x_scaled) if isinstance(x_scaled, to.Tensor): x_scaled = x_scaled.numpy() # for easier checking with pytest.approx x_scaled_back = x_scaled_back.numpy() # for easier checking with pytest.approx assert np.all(scaler._bound_lo * np.ones_like(x_scaled) <= x_scaled) assert np.all(x_scaled <= scaler._bound_up * np.ones_like(x_scaled)) assert np.allclose(x, x_scaled_back)
[docs]@pytest.mark.parametrize( "x, b", [ [(1, 2, 3), True], [(1.0, 2.0, 3.0), True], [[1, 2, 3], True], [(1, 2, 3.0), False], [(1, 2, [3]), False], ], ) def test_check_all_types_equal(x, b): assert check_all_types_equal(x) == b
[docs]@pytest.mark.parametrize( "x, b", [ [([1], [2], [3]), True], [([1, 2], [2], [3]), False], [(np.array([1]), np.array([2]), np.array([3])), True], [(np.array([1, 2]), np.array([2]), np.array([3])), False], [(to.tensor([1]), to.tensor([2]), to.tensor([3])), True], [(to.tensor([1, 2]), to.tensor([2]), to.tensor([3])), False], ], ) def test_check_all_lengths_equal(x, b): assert check_all_lengths_equal(x) == b
[docs]@pytest.mark.parametrize( "x, b", [ [(np.array([1]), np.array([2]), np.array([3])), True], [(np.array([1, 2]), np.array([2]), np.array([3])), False], [(to.tensor([1]), to.tensor([2]), to.tensor([3])), True], [(to.tensor([1, 2]), to.tensor([2]), to.tensor([3])), False], ], ) def test_check_all_shapes_equal(x, b): assert check_all_shapes_equal(x) == b
[docs]@pytest.mark.parametrize( "x, b", [ [({"a": 1, "b": "a"}, {"a": 1, "b": "a"}), True], [({"a": 2}, {"a": 1}), False], [({"a": {"b": 42, "c": 43}}, {"a": {"b": 42, "c": 43}}), True], [({"a": np.array([1, 2, 3])}, {"a": np.array([1, 2, 3])}, {"a": np.array([1, 2, 3])}), True], [({"a": np.array([1, 2, 3])}, {"a": np.array([1, 2, 8])}), False], [({"a": np.array([1, 2, 3])}, {"a": np.array([1, 2, 8]), "c": 42}), False], [({"a": {"b": 42, "c": np.array([1, 2, 3])}}, {"a": {"b": 42, "c": np.array([1, 2, 3])}}), True], ], ) def test_check_all_equal(x, b): assert check_all_equal(x) == b
[docs]def test_iteration_tracker(): tracker: IterationTracker = IterationTracker() assert isinstance(tracker, IterationTracker) with pytest.raises(IndexError): tracker.pop() tracker.push("meta", 1) tracker.push("sub", 42) assert str(tracker) == "meta_1-sub_42" assert tracker.peek() == ("sub", 42) assert list(tracker) == [("meta", 1), ("sub", 42)] assert tracker.pop() == ("sub", 42) assert tracker.pop() == ("meta", 1) assert tracker.get("magic") is None with tracker.iteration("meta", 1): assert tracker.peek() == ("meta", 1) with tracker.iteration("sub", 42): assert tracker.get("sub") == 42 assert tracker.format() == "meta_1-sub_42"
[docs]@pytest.mark.parametrize("x", [np.empty((7,)), np.empty((7, 1)), to.empty((7,)), to.empty((7, 1))]) def test_correct_atleast_2d(x): x_corrected = correct_atleast_2d(x) assert x_corrected.shape[0] == len(x)
[docs]@pytest.mark.parametrize("base", [dict(a=1, b=np.ones(3), c=dict(c1=2.0, c2=4), y=None, z=26)]) @pytest.mark.parametrize("updater", [dict(a=11, b=np.ones(33), c=dict(c1=22.0), x=13, y=None)]) def test_update_matching_keys_recursively(base, updater): base = copy.deepcopy(base) update_matching_keys_recursively(base, updater) # Check unchanged assert base["y"] is None assert base["z"] == 26 assert base["c"]["c2"] == 4 assert base.get("x", None) == 13 # Check shallow changed assert base["a"] == updater["a"] assert np.all(base["b"] == updater["b"]) assert base["c"]["c1"] == updater["c"]["c1"]
[docs]@pytest.mark.parametrize( ("transformation", "value", "transformed_value"), [ (LogTransformation(), math.e, 1), (LogTransformation(), 0.001, math.log(0.001)), (SqrtTransformation(), 4, 2), (SqrtTransformation(), 1, 1), (SqrtTransformation(), 0, 0), (IdentityTransformation(), 5, 5), (IdentityTransformation(), 0, 0), ], ) def test_transformations(transformation, value, transformed_value): assert pytest.approx(transformed_value, transformation.forward(value)) assert pytest.approx(value, transformation.forward(transformed_value)) assert transformation.forward(np.array(value)).shape == np.array(value).shape assert transformation.forward(np.ones(5, 5)).shape == (5, 5) assert transformation.forward(to.tensor(value)).shape == to.tensor(value).shape assert transformation.forward(to.ones(5, 5)).shape == (5, 5) assert transformation.inverse(np.array(value)).shape == np.array(value).shape assert transformation.inverse(np.ones(5, 5)).shape == (5, 5) assert transformation.inverse(to.tensor(value)).shape == to.tensor(value).shape assert transformation.inverse(to.ones(5, 5)).shape == (5, 5)