environment_wrappers

action_delay

class ActDelayWrapper(wrapped_env: Env, delay: int = 0)[source]

Bases: EnvWrapperAct, Serializable

Environment wrapper which delays actions by a fixed number of time steps.

Constructor

Parameters:
  • wrapped_env – environment to wrap around (only makes sense from simulation environments)

  • delay – integer action delay measured in number of time steps

property delay: int

Get the (rounded) action delay measured in time steps.

reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

action_discrete

class ActDiscreteWrapper(wrapped_env: Env, num_bins: int = 2)[source]

Bases: EnvWrapperAct, Serializable

Environment wrapper that converts a one-dimensional continuous into a discrete action space.

Constructor

Parameters:
  • wrapped_env – environment to wrap

  • num_bins – number of actions to split the continuous (box) space into

action_noise

class GaussianActNoiseWrapper(wrapped_env: Env, noise_mean: Optional[Union[float, ndarray]] = None, noise_std: Optional[Union[float, ndarray]] = None)[source]

Bases: EnvWrapperAct, Serializable

Environment wrapper which adds normally distributed i.i.d. noise to all action. This noise is independent for the potentially applied action-based exploration strategy.

Constructor

Parameters:
  • wrapped_env – environment to wrap around (only makes sense for simulations)

  • noise_mean – mean of the noise distribution

  • noise_std – standard deviation of the noise distribution

action_normalization

class ActNormWrapper(wrapped_env: Env)[source]

Bases: EnvWrapperAct

Environment wrapper which normalizes the action space, such that all action values are in range [-1, 1].

Constructor

Parameters:

wrapped_env – environment to wrap

adversarial

class AdversarialDynamicsWrapper(wrapped_env, policy, eps, phi, width=0.25)[source]

Bases: AdversarialWrapper, Serializable

“ Wrapper to apply adversarial perturbations to the domain parameters (used in ARPL)

Constructor

Parameters:
  • wrapped_env – environemnt to be wrapped

  • policy – policy to be updated

  • eps – magnitude of perturbation

  • phi – probability of perturbation

  • width – width of distribution to sample from

get_arpl_grad(state: Tensor)[source]
re_adv()[source]
reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

step(act: ndarray) tuple[source]

Perform one time step of the simulation. When a terminal condition is met, the reset function is called.

Parameters:

act – action to be taken in the step

Return tuple of obs, reward, done, and info:

obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment

class AdversarialObservationWrapper(wrapped_env, policy, eps, phi)[source]

Bases: AdversarialWrapper, Serializable

“ Wrapper to apply adversarial perturbations to the observations (used in ARPL)

Constructor

Parameters:
  • wrapped_env – environment to be wrapped

  • policy – policy to be updated

  • eps – magnitude of perturbation

  • phi – probability of perturbation

get_arpl_grad(state)[source]
step(act: ndarray) tuple[source]

Perform one time step of the simulation. When a terminal condition is met, the reset function is called.

Parameters:

act – action to be taken in the step

Return tuple of obs, reward, done, and info:

obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment

class AdversarialStateWrapper(wrapped_env: ~pyrado.environments.base.Env, policy: ~pyrado.policies.base.Policy, eps: <module 'numpy.core.numeric' from '/home/user/miniconda3/envs/pyrado/lib/python3.7/site-packages/numpy/core/numeric.py'>, phi, torch_observation: ~typing.Optional[~typing.Callable] = None)[source]

Bases: AdversarialWrapper, Serializable

“ Wrapper to apply adversarial perturbations to the state (used in ARPL)

Constructor

Parameters:
  • wrapped_env – environment to be wrapped

  • policy – policy to be updated

  • eps – magnitude of perturbation

  • phi – probability of perturbation

get_arpl_grad(state_tensor, nonobserved)[source]
step(act: ndarray) tuple[source]

Perform one time step of the simulation. When a terminal condition is met, the reset function is called.

Parameters:

act – action to be taken in the step

Return tuple of obs, reward, done, and info:

obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment

class AdversarialWrapper(wrapped_env, policy, eps, phi)[source]

Bases: EnvWrapper, ABC

Base class for adversarial wrappers (used in ARPL)

Constructor

Parameters:

wrapped_env – environment to wrap

decide_apply()[source]
property phi
static quadratic_loss(action)[source]

base

class EnvWrapper(wrapped_env: Env)[source]

Bases: Env, Serializable

Base for all environment wrappers. Delegates all environment methods to the wrapped environment.

Constructor

Parameters:

wrapped_env – environment to wrap

property act_space: Space

Get the space of the actions.

close()[source]

Disconnect from the device.

property curr_step: int

Get the number of the current simulation step (0 for the initial step).

property domain_param: dict

These are the environment’s domain parameters, which are synonymous to the parameters used by the simulator to run the physics simulation (e.g., masses, extents, or friction coefficients). The property domain_param includes all parameters that can be perturbed a.k.a. randomized, but there might also be additional parameters.

property dt

Get the time step size.

forward(value: Union[int, float, ndarray, Tensor]) Union[int, float, ndarray, Tensor][source]

Recursively go though the stack of wrappers and try to apply the forward transformation. This assumes that there is only one.

Parameters:

value – domain parameter value in the original space

Returns:

domain parameter value in the transformed space

get_nominal_domain_param() dict[source]

Get the nominal a.k.a. default domain parameters.

Note

This function is used to check which domain parameters exist.

property init_space: Space

Get the initial state space if it exists. Forwards to the wrapped environment.

inverse(value: Union[int, float, ndarray, Tensor]) Union[int, float, ndarray, Tensor][source]

Recursively go though the stack of wrappers and try to apply the inverse transformation. This assumes that there is only one.

Parameters:

value – domain parameter value in the transformed space

Returns:

domain parameter value in the original space

property max_steps: Union[int, float]

Get the maximum number of simulation steps.

Note

The step count should always be an integer. Some environments have no maximum step size. For these, float(‘Inf’) should be used, since it is the only value larger then any int.

Returns:

maximum number of time steps before the environment terminates

property name: str

Get the wrapped environment’s abbreviated name.

property obs_space: Space

Get the space of the observations (agent’s perception of the environment).

property randomizer: Optional[DomainRandomizer]

Get the wrapped environment’s domain randomizer.

render(mode: RenderMode, render_step: int = 1)[source]

Visualize one time step.

Parameters:
  • mode – render mode: console, video, or both

  • render_step – interval for rendering

reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

property state: ndarray

Get the state of the wrapped environment.

property state_space: Space

Get the space of the states (used for describing the environment).

step(act: ndarray) tuple[source]

Perform one time step of the simulation. When a terminal condition is met, the reset function is called.

Parameters:

act – action to be taken in the step

Return tuple of obs, reward, done, and info:

obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment

property supported_domain_param: Iterable

Get an iterable of all supported domain parameters. The default implementation takes the keys of get_nominal_domain_param(). The domain parameters are automatically stored in attributes prefixed with ‘_’.

property task: Task

Get the task describing what the agent should do in the environment.

property wrapped_env: Env

Get the wrapped environment of this wrapper.

class EnvWrapperAct(wrapped_env: Env)[source]

Bases: EnvWrapper

Base class for environment wrappers modifying the action. Override _process_action to pass a modified action vector to the wrapped environment. If necessary, you should also override _process_action_space to report the correct one.

Constructor

Parameters:

wrapped_env – environment to wrap

property act_space: Space

Get the space of the actions.

step(act: ndarray) tuple[source]

Perform one time step of the simulation. When a terminal condition is met, the reset function is called.

Parameters:

act – action to be taken in the step

Return tuple of obs, reward, done, and info:

obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment

class EnvWrapperObs(wrapped_env: Env)[source]

Bases: EnvWrapper

Base class for environment wrappers modifying the observation. Override _process_obs to pass a modified observation vector to the wrapped environment. If necessary, you should also override _process_obs_space to report the correct one.

Constructor

Parameters:

wrapped_env – environment to wrap

property obs_space: Space

Get the space of the observations (agent’s perception of the environment).

reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

step(act: ndarray) tuple[source]

Perform one time step of the simulation. When a terminal condition is met, the reset function is called.

Parameters:

act – action to be taken in the step

Return tuple of obs, reward, done, and info:

obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment

domain_randomization

class DomainRandWrapper(wrapped_env: Union[SimEnv, EnvWrapper], randomizer: Optional[DomainRandomizer])[source]

Bases: EnvWrapper, Serializable

Base class for environment wrappers which call a DomainRandomizer to randomize the domain parameters

Constructor

Parameters:
  • wrapped_env – environment to wrap

  • randomizerDomainRandomizer object holding the probability distribution of all randomizable domain parameters, pass None if you want to subclass wrapping another DomainRandWrapper and use its randomizer

property randomizer: DomainRandomizer

Get the wrapped environment’s domain randomizer.

class DomainRandWrapperBuffer(wrapped_env, randomizer: Optional[DomainRandomizer], selection: Optional[str] = 'cyclic')[source]

Bases: DomainRandWrapper, Serializable

Domain randomization wrapper which randomized the wrapped env using a buffer of domain parameter sets. At every call of the reset method this wrapper cycles through that buffer.

Constructor

Parameters:
  • wrapped_env – environment to wrap around

  • randomizerDomainRandomizer object that manages the randomization. If None, the user has to set the buffer manually, the circular reset however works the same way

  • selection – method to draw samples from the buffer, either cyclic or random

property buffer

Get the domain parameter buffer.

fill_buffer(num_domains: int)[source]

Fill the internal buffer with domains.

Parameters:

num_domains – number of randomized domain parameter sets to store in the buffer

reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

property ring_idx: int

Get the buffer’s index.

property selection: str

Get the selection method.

class DomainRandWrapperLive(wrapped_env: Union[SimEnv, EnvWrapper], randomizer: Optional[DomainRandomizer])[source]

Bases: DomainRandWrapper, Serializable

Domain randomization wrapper which randomized the wrapped env at every reset. Thus every rollout is done with different domain parameters.

Constructor

Parameters:
  • wrapped_env – environment to wrap

  • randomizerDomainRandomizer object holding the probability distribution of all randomizable domain parameters, pass None if you want to subclass wrapping another DomainRandWrapper and use its randomizer

reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

class MetaDomainRandWrapper(wrapped_rand_env: DomainRandWrapper, dp_mapping: Mapping[int, Tuple[str, str]])[source]

Bases: DomainRandWrapper, Serializable

Domain randomization wrapper which wraps another DomainRandWrapper to adapt its parameters, called domain distribution parameters.

Constructor

Parameters:
  • wrapped_rand_env – randomized environment to wrap

  • dp_mapping – mapping from index of the numpy array (coming from the algorithm) to domain parameter name (e.g. mass, length) and the domain distribution parameter (e.g. mean, std)

# For the mapping arg use the this dict constructor
```
m = {0: ('name1', 'parameter_type1'), 1: ('name2', 'parameter_type2')}
```
adapt_randomizer(domain_distr_param_values: ndarray)[source]
property randomizer: DomainRandomizer

Get the wrapped environment’s domain randomizer.

remove_all_dr_wrappers(env: Env, verbose: bool = False)[source]

Go through the environment chain and remove all wrappers of type DomainRandWrapper (and subclasses).

Parameters:
  • env – env chain with domain randomization wrappers

  • verbose – choose if status messages should be printed

Returns:

env chain without domain randomization wrappers

downsampling

class DownsamplingWrapper(wrapped_env: [<class 'pyrado.environments.real_base.RealEnv'>, <class 'pyrado.environment_wrappers.base.EnvWrapper'>], factor: int, obs_filter_fcn: callable = functools.partial(<function mean>, axis=0), init_obs: ~typing.Optional[~numpy.ndarray] = None)[source]

Bases: EnvWrapperAct, EnvWrapperObs, Serializable

Environment wrapper which downsamples the actions coming from the rollout loop. This wrapper is intended to be used with the real Quanser devices, since these are set up to usually run on 500Hz, i.e. one send and receive every 0.002s. When learning in simulation, this requires a lot of samples per rollout, which makes learning more time-consuming and difficult (fine tuning the temporal discount factor). In order to be able to learn on a lower frequency, e.g. 100Hz, we downsample the actions passed to the real device, i.e. just send every factor action.

Note

The observations are not affected! Thus the policy still receives one observation per environment step, i.e. send and receive at the real device’s sampling frequency.

Constructor

Parameters:
  • wrapped_env – environment to wrap around

  • factor – downsampling factor i.e. number of time steps for which every action should be repeated

  • obs_filter_fcn – function for processing the observations in the buffer, operates along 0-dimension

  • init_obs – initial observation to see the buffer, if None the buffer is initialized with zero arrays

property factor

Get the downsampling factor.

reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

observation_noise

class GaussianObsNoiseWrapper(wrapped_env: Union[SimEnv, EnvWrapper], noise_std: Union[list, ndarray], noise_mean: Optional[Union[list, ndarray]] = None)[source]

Bases: EnvWrapperObs, Serializable

Environment wrapper which adds normally distributed i.i.d. noise to all observations.

Parameters:
  • wrapped_env – environment to wrap

  • noise_std – list or numpy array for the standard deviation of the noise

  • noise_mean – list or numpy array for the mean of the noise, by default all zeros, i.e. no bias

observation_normalization

class ObsNormWrapper(wrapped_env: Union[Env, EnvWrapper], explicit_lb: Optional[Mapping[str, float]] = None, explicit_ub: Optional[Mapping[str, float]] = None)[source]

Bases: EnvWrapperObs, Serializable

Environment wrapper which normalizes the observation space using the bounds from the environment or hard-coded bounds, such that all values are in range [-1, 1]

Constructor

Parameters:
  • wrapped_env – environment to wrap

  • explicit_lb – dict to override the environment’s lower bound; by default (None) this is ignored; the keys are space labels, the values the new bound for that labeled entry

  • explicit_ub – dict to override the environment’s upper bound; by default (None) this is ignored; the keys are space labels, the values the new bound for that labeled entry

static override_bounds(bounds: ndarray, override: Optional[Mapping[str, float]], names: ndarray) ndarray[source]

Override a given bound. This function is useful if some entries of the observation space have an infinite bound and/or you want to specify a certain bound

Parameters:
  • bounds – bound to override

  • override – value to override with

  • names – label of the bound to override

Returns:

new bound created from a copy of the old bound

class ObsRunningNormWrapper(wrapped_env: Env)[source]

Bases: EnvWrapperObs, Serializable

Environment wrapper which normalizes the observation space using the bounds from the environment or hard-coded bounds, such that all values are in range [-1, 1]

Constructor

Parameters:

wrapped_env – environment to wrap

observation_partial

class ObsPartialWrapper(wrapped_env: Env, mask: Optional[list] = None, idcs: Optional[list] = None, keep_selected: bool = False)[source]

Bases: EnvWrapperObs, Serializable

Environment wrapper which creates a partial observation by masking certain elements

Constructor

Parameters:
  • wrapped_env – environment to wrap

  • mask – mask out array, entries with 1 are dropped (behavior can be inverted by keep_selected=True)

  • idcs – indices to drop, ignored if mask is specified. If the observation space is labeled, the labels can be used as indices.

  • keep_selected – set to true to keep the mask entries with 1/the specified indices and drop the others

observation_velfilter

class ObsVelFiltWrapper(wrapped_env: SimEnv, mask_pos: Optional[List] = None, idcs_pos: Optional[List] = None, mask_vel: Optional[List] = None, idcs_vel: Optional[List] = None, num: Optional[Tuple] = (50, 0), den: Optional[Tuple] = (1, 50))[source]

Bases: EnvWrapperObs, Serializable

Environment wrapper which computes the velocities from the satets given a linear filter

Constructor

Parameters:
  • wrapped_env – environment to wrap, can only be used on SimEnv since access to the state is needed, and we don’t want to assume that all RealEnv can reconstruct this state from the observations. It wouldn’t make much sense to wrap a RealEnv with this wrapper anyway, since the goal it to mimic the behavior of the real environments velocity filter.

  • mask_pos – state mask array to select the position quantities in the state space, entries with 1 are kept

  • idcs_pos – state indices to select, ignored if mask is specified. If the state space is labeled, these labels can be used as indices.

  • mask_vel – observation mask array to select the velocity quantities in the observation space, entries with 1 are kept

  • idcs_vel – velocity observation indices to select, ignored if mask is specified. If the observation space is labeled, these labels can be used as indices.

  • num – continuous-time filter numerator

  • den – continuous-time filter denominator

init_filter(init_state)[source]

Set the initial state of the velocity filter. This is useful when the initial (position) observation has been received and it is non-zero. Otherwise the filter would assume a very high initial velocity.

Parameters:

init_state – initial state to set the filter

reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

state_augmentation

class StateAugmentationWrapper(wrapped_env: Env, domain_param: Optional[Sequence[str]] = None, fixed: bool = False)[source]

Bases: EnvWrapper, Serializable

Augments the observation of the wrapped environment by its physics configuration

Constructor

Parameters:
  • wrapped_env – the environment to be wrapped

  • domain_param – list of domain parameter names to include in the observation, pass None to select all

  • fixed – fix the parameters

property mask
property nominal
property obs_space

Get the space of the observations (agent’s perception of the environment).

property offset
property param
reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray[source]

Reset the environment to its initial state and optionally set different domain parameters.

Parameters:
  • init_state – set explicit initial state if not None

  • domain_param – set explicit domain parameters if not None

Return obs:

initial observation of the state.

set_adv(params)[source]
step(act: ndarray) tuple[source]

Perform one time step of the simulation. When a terminal condition is met, the reset function is called.

Parameters:

act – action to be taken in the step

Return tuple of obs, reward, done, and info:

obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment

utils

Some helpers for wrapped environment chains. A ‘chain’ consists of an environment and multiple EnvWrappers wrapping it. The real environment is always at the end of the chain. The modifying methods in this file assume that all EnvWrapper subclasses use Serializable properly, and that it’s ctor takes the wrapped environment as first positional parameter.

all_envs(env)[source]

Iterates over the environment chain.

Parameters:

env – outermost environment of the chain

Returns:

an iterable over the whole chain from outermost to innermost

attr_env(env, attr)[source]

Locate the first element in the chain that has an attribute of the given name. Returns None if not found.

Parameters:
  • env – outermost environment of the chain

  • attr – attribute name to search

Returns:

the first environment with the given attribute

attr_env_get(env, attr)[source]

Locate the first element in the chain that has an attribute of the given name and return the value of the attribute. Returns None if not found.

Parameters:
  • env – outermost environment of the chain

  • attr – attribute name to search

Returns:

the value of the given attribute, taken from the first environment with the given attribute

inner_env(env)[source]

Returns the innermost (a.k.a. non-wrapper) environment.

Parameters:

env – outermost environment of the chain

Returns:

the innermost environment of the chain

insert_env_before(stack, key_type, insert_type, *args, **kwargs)[source]

Add an EnvWrapper of the given type right before key_type to the environment chain and return the modified chain. The original stack is unmodified, but untouched parts will be shared. If the key is not found, nothing will be done and the original chain is returned.

Parameters:
  • stack – outermost environment of the chain

  • key_type – Type of environment to insert before. May be None to use the innermost environment

  • insert_type – type of environment to insert

  • args – additional args for insert_type’s ctor

  • kwargs – additional kwargs for insert_type’s ctor

Returns:

the modified environment chain

remove_env(stack, key_type)[source]

Remove an EnvWrapper of the given type from the environment chain and return the modified chain. The original stack is unmodified, but untouched parts will be shared. If the key is not found, nothing will be done and the original chain is returned.

Parameters:
  • stack – outermost environment of the chain

  • key_type – type of environment to remove

Returns:

the modified environment chain

typed_env(env, tp)[source]

Locate the first element in the chain that is an instance of the given type. Returns None if not found.

Parameters:
  • env – outermost environment of the chain

  • tp – the environment type to find, see isinstance for possible values.

Returns:

the first environment with the given type

Module contents