How to wrap an environment
Let’s first create the basic simulator without any wrappers
import numpy as np
from prettyprinter import pprint
from pyrado.domain_randomization.default_randomizers import create_default_randomizer
from pyrado.environment_wrappers.action_delay import ActDelayWrapper
from pyrado.environment_wrappers.action_noise import GaussianActNoiseWrapper
from pyrado.environment_wrappers.action_normalization import ActNormWrapper
from pyrado.environment_wrappers.domain_randomization import DomainRandWrapperBuffer
from pyrado.environment_wrappers.observation_normalization import ObsNormWrapper
from pyrado.environment_wrappers.observation_partial import ObsPartialWrapper
from pyrado.environment_wrappers.utils import inner_env, remove_env, typed_env
from pyrado.environments.pysim.quanser_cartpole import QCartPoleSwingUpSim
from pyrado.policies.special.dummy import DummyPolicy
from pyrado.sampling.rollout import rollout
from pyrado.utils.data_types import RenderMode
env = QCartPoleSwingUpSim(dt=1/50., max_steps=10)
print(f'dim obs: {env.obs_space.flat_dim}\n'
f'dim state: {env.state_space.flat_dim}\n'
f'dim act: {env.act_space.flat_dim}\n')
The print reveals that we have a 4-dimensional sate space, a 5-dimensional observation space, and a 1-dimensional action space, all of type BoxSpace which is a straightforward continuous space in \(R^n\). Since we are probably here to do some domain randomization, we will start with that. There are different types of randomizers. The DomainRandWrapperLive sets a new set of domain parameters on every reset of the environment. The DomainRandWrapperBuffer maintains a buffer of domain parameter set which we have to fill explicitly. This way, it cycles through a set of domains. There is also the MetaDomainRandWrapper which adapts the randomizer, i.e. changes the distribution according to which the domain parameters are randomized.
randomizer = create_default_randomizer(env)
print(randomizer)
env_r = DomainRandWrapperBuffer(env, randomizer)
env_r.fill_buffer(num_domains=3)
Let’s have a look at the randomized simulation. Due to the synchronized random seed we have the same initial state as well as the same random action. However, the trajectory id not the same since the domains evolve differently. Note that the very first action and reward are only different for logging.
for i in range(4):
rollout(env_r, DummyPolicy(env_r.spec), eval=True, seed=0, render_mode=RenderMode(video=False, text=True))
pprint(env.domain_param, indent=4)
In general, the environments’ individual observation dimensions have very different scales and limits. It is an open secret that one of the most important things for RL to work well are equally scaled actions and observations. Thus we will scale these spaces to [-1, 1] for every dimension. Check the wrappers’ _process_act and _process_obs functions for details. One observation dimension does not have a finite lime. Therefore we need to provide and explicit limit for normalizing. We can also provide explicit bounds to override existing ones.
print(env_r.act_space)
env_rn = ActNormWrapper(env)
print(env_rn.act_space)
print(env_rn.obs_space)
elb = {'x_dot': -213., 'theta_dot': -42.}
eub = {'x_dot': 213., 'theta_dot': 42., 'x': 0.123}
env_rn = ObsNormWrapper(env_rn, explicit_lb=elb, explicit_ub=eub)
print(env_rn.obs_space)
So if we now do a rollout, we can see the effect of the normalization
ro_r = rollout(env_r, DummyPolicy(env_r.spec), eval=True, seed=0, render_mode=RenderMode())
ro_rn = rollout(env_rn, DummyPolicy(env_rn.spec), eval=True, seed=0, render_mode=RenderMode())
np.set_printoptions(precision=3)
np.set_printoptions(suppress=True)
print(f'observations without normalization:\n{ro_r.observations}')
print(f'observations with normalization:\n{ro_rn.observations}')
assert np.allclose(env_rn._process_obs(ro_r.observations), ro_rn.observations)
In case we want to mask some observations from wer policy, e.g. if the real system does not observe a quantity that is available during simulation, we can mask them out using the ObsPartialWrapper. This wrapper can mask using an array of zeros and ones, or by passing a list of the exact labels.
env_rnp = ObsPartialWrapper(env_rn, idcs=['x_dot', 'cos_theta'])
print(env_rnp.obs_space)
ro_rnp = rollout(env_rnp, DummyPolicy(env_rnp.spec), eval=True, seed=0, render_mode=RenderMode())
print(f'partial observations with normalization:\n{ro_rnp.observations}')
We can also apply wrappers that apply additional noise to the action (GaussianActNoiseWrapper) or observations (GaussianObsNoiseWrapper). The action wrappers will not modify the `action filed in the recorded rollout, since this one is capturing the action as they are commanded by the policy.
env_rnpa = GaussianActNoiseWrapper(env_rnp,
noise_mean=0.5*np.ones(env_rnp.act_space.shape),
noise_std=0.1*np.ones(env_rnp.act_space.shape))
ro_rnpa = rollout(env_rnpa, DummyPolicy(env_rnpa.spec), eval=True, seed=0, render_mode=RenderMode())
assert np.allclose(ro_rnp.actions, ro_rnpa.actions)
assert not np.allclose(ro_rnp.observations, ro_rnpa.observations)
Real-world devices often have delays. One way to model this effect is by artificially hold back the current action for a given number of time steps. Again, the modified action fields in the recorded rollouts are the same. Have a look at the printed actions a_t as well as next state s_t+1
ro_rnp = rollout(env_rnp, DummyPolicy(env_rnp.spec), eval=True, seed=0, render_mode=RenderMode(text=True)) # redo for visual comparison
env_rnpd = ActDelayWrapper(env_rnp, delay=3)
ro_rnpd = rollout(env_rnpd, DummyPolicy(env_rnpd.spec), eval=True, seed=0, render_mode=RenderMode(text=True))
assert np.allclose(ro_rnp.actions, ro_rnpd.actions)
assert not np.allclose(ro_rnp.observations, ro_rnpd.observations)
There are also very handy utils to manage chains of wrappers. Examples are`inner_env()` which yields the core environment, typed_env() which yields the first element of the chain equal to the provided type, or remove_env which removes the first element of the chain equal to the provided type.
assert isinstance(inner_env(env_rnpd), QCartPoleSwingUpSim)
assert typed_env(env_rnpd, ObsPartialWrapper) is not None
assert isinstance(env_rnpd, ActDelayWrapper)
env_rnpdr = remove_env(env_rnpd, ActDelayWrapper)
assert not isinstance(env_rnpdr, ActDelayWrapper)
Finally, the most important lesson: the order in which we apply the environment wrappers matters! For example, applying the ObsNormWrapper after the ObsPartialWrapper will not give we the intended result (due to the implementation). Another example is the order of ObsNormWrapper and GaussianObsNoiseWrapper.