environment_wrappers
action_delay
- class ActDelayWrapper(wrapped_env: Env, delay: int = 0)[source]
Bases:
EnvWrapperAct
,Serializable
Environment wrapper which delays actions by a fixed number of time steps.
Constructor
- Parameters:
wrapped_env – environment to wrap around (only makes sense from simulation environments)
delay – integer action delay measured in number of time steps
- property delay: int
Get the (rounded) action delay measured in time steps.
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
action_discrete
- class ActDiscreteWrapper(wrapped_env: Env, num_bins: int = 2)[source]
Bases:
EnvWrapperAct
,Serializable
Environment wrapper that converts a one-dimensional continuous into a discrete action space.
Constructor
- Parameters:
wrapped_env – environment to wrap
num_bins – number of actions to split the continuous (box) space into
action_noise
- class GaussianActNoiseWrapper(wrapped_env: Env, noise_mean: Optional[Union[float, ndarray]] = None, noise_std: Optional[Union[float, ndarray]] = None)[source]
Bases:
EnvWrapperAct
,Serializable
Environment wrapper which adds normally distributed i.i.d. noise to all action. This noise is independent for the potentially applied action-based exploration strategy.
Constructor
- Parameters:
wrapped_env – environment to wrap around (only makes sense for simulations)
noise_mean – mean of the noise distribution
noise_std – standard deviation of the noise distribution
action_normalization
- class ActNormWrapper(wrapped_env: Env)[source]
Bases:
EnvWrapperAct
Environment wrapper which normalizes the action space, such that all action values are in range [-1, 1].
Constructor
- Parameters:
wrapped_env – environment to wrap
adversarial
- class AdversarialDynamicsWrapper(wrapped_env, policy, eps, phi, width=0.25)[source]
Bases:
AdversarialWrapper
,Serializable
“ Wrapper to apply adversarial perturbations to the domain parameters (used in ARPL)
Constructor
- Parameters:
wrapped_env – environemnt to be wrapped
policy – policy to be updated
eps – magnitude of perturbation
phi – probability of perturbation
width – width of distribution to sample from
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
- step(act: ndarray) tuple [source]
Perform one time step of the simulation. When a terminal condition is met, the reset function is called.
- Parameters:
act – action to be taken in the step
- Return tuple of obs, reward, done, and info:
obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment
- class AdversarialObservationWrapper(wrapped_env, policy, eps, phi)[source]
Bases:
AdversarialWrapper
,Serializable
“ Wrapper to apply adversarial perturbations to the observations (used in ARPL)
Constructor
- Parameters:
wrapped_env – environment to be wrapped
policy – policy to be updated
eps – magnitude of perturbation
phi – probability of perturbation
- step(act: ndarray) tuple [source]
Perform one time step of the simulation. When a terminal condition is met, the reset function is called.
- Parameters:
act – action to be taken in the step
- Return tuple of obs, reward, done, and info:
obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment
- class AdversarialStateWrapper(wrapped_env: ~pyrado.environments.base.Env, policy: ~pyrado.policies.base.Policy, eps: <module 'numpy.core.numeric' from '/home/user/miniconda3/envs/pyrado/lib/python3.7/site-packages/numpy/core/numeric.py'>, phi, torch_observation: ~typing.Optional[~typing.Callable] = None)[source]
Bases:
AdversarialWrapper
,Serializable
“ Wrapper to apply adversarial perturbations to the state (used in ARPL)
Constructor
- Parameters:
wrapped_env – environment to be wrapped
policy – policy to be updated
eps – magnitude of perturbation
phi – probability of perturbation
- step(act: ndarray) tuple [source]
Perform one time step of the simulation. When a terminal condition is met, the reset function is called.
- Parameters:
act – action to be taken in the step
- Return tuple of obs, reward, done, and info:
obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment
base
- class EnvWrapper(wrapped_env: Env)[source]
Bases:
Env
,Serializable
Base for all environment wrappers. Delegates all environment methods to the wrapped environment.
Constructor
- Parameters:
wrapped_env – environment to wrap
- property curr_step: int
Get the number of the current simulation step (0 for the initial step).
- property domain_param: dict
These are the environment’s domain parameters, which are synonymous to the parameters used by the simulator to run the physics simulation (e.g., masses, extents, or friction coefficients). The property domain_param includes all parameters that can be perturbed a.k.a. randomized, but there might also be additional parameters.
- property dt
Get the time step size.
- forward(value: Union[int, float, ndarray, Tensor]) Union[int, float, ndarray, Tensor] [source]
Recursively go though the stack of wrappers and try to apply the forward transformation. This assumes that there is only one.
- Parameters:
value – domain parameter value in the original space
- Returns:
domain parameter value in the transformed space
- get_nominal_domain_param() dict [source]
Get the nominal a.k.a. default domain parameters.
Note
This function is used to check which domain parameters exist.
- property init_space: Space
Get the initial state space if it exists. Forwards to the wrapped environment.
- inverse(value: Union[int, float, ndarray, Tensor]) Union[int, float, ndarray, Tensor] [source]
Recursively go though the stack of wrappers and try to apply the inverse transformation. This assumes that there is only one.
- Parameters:
value – domain parameter value in the transformed space
- Returns:
domain parameter value in the original space
- property max_steps: Union[int, float]
Get the maximum number of simulation steps.
Note
The step count should always be an integer. Some environments have no maximum step size. For these, float(‘Inf’) should be used, since it is the only value larger then any int.
- Returns:
maximum number of time steps before the environment terminates
- property name: str
Get the wrapped environment’s abbreviated name.
- property obs_space: Space
Get the space of the observations (agent’s perception of the environment).
- property randomizer: Optional[DomainRandomizer]
Get the wrapped environment’s domain randomizer.
- render(mode: RenderMode, render_step: int = 1)[source]
Visualize one time step.
- Parameters:
mode – render mode: console, video, or both
render_step – interval for rendering
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
- property state: ndarray
Get the state of the wrapped environment.
- step(act: ndarray) tuple [source]
Perform one time step of the simulation. When a terminal condition is met, the reset function is called.
- Parameters:
act – action to be taken in the step
- Return tuple of obs, reward, done, and info:
obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment
- property supported_domain_param: Iterable
Get an iterable of all supported domain parameters. The default implementation takes the keys of get_nominal_domain_param(). The domain parameters are automatically stored in attributes prefixed with ‘_’.
- class EnvWrapperAct(wrapped_env: Env)[source]
Bases:
EnvWrapper
Base class for environment wrappers modifying the action. Override _process_action to pass a modified action vector to the wrapped environment. If necessary, you should also override _process_action_space to report the correct one.
Constructor
- Parameters:
wrapped_env – environment to wrap
- step(act: ndarray) tuple [source]
Perform one time step of the simulation. When a terminal condition is met, the reset function is called.
- Parameters:
act – action to be taken in the step
- Return tuple of obs, reward, done, and info:
obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment
- class EnvWrapperObs(wrapped_env: Env)[source]
Bases:
EnvWrapper
Base class for environment wrappers modifying the observation. Override _process_obs to pass a modified observation vector to the wrapped environment. If necessary, you should also override _process_obs_space to report the correct one.
Constructor
- Parameters:
wrapped_env – environment to wrap
- property obs_space: Space
Get the space of the observations (agent’s perception of the environment).
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
- step(act: ndarray) tuple [source]
Perform one time step of the simulation. When a terminal condition is met, the reset function is called.
- Parameters:
act – action to be taken in the step
- Return tuple of obs, reward, done, and info:
obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment
domain_randomization
- class DomainRandWrapper(wrapped_env: Union[SimEnv, EnvWrapper], randomizer: Optional[DomainRandomizer])[source]
Bases:
EnvWrapper
,Serializable
Base class for environment wrappers which call a DomainRandomizer to randomize the domain parameters
Constructor
- Parameters:
wrapped_env – environment to wrap
randomizer – DomainRandomizer object holding the probability distribution of all randomizable domain parameters, pass None if you want to subclass wrapping another DomainRandWrapper and use its randomizer
- property randomizer: DomainRandomizer
Get the wrapped environment’s domain randomizer.
- class DomainRandWrapperBuffer(wrapped_env, randomizer: Optional[DomainRandomizer], selection: Optional[str] = 'cyclic')[source]
Bases:
DomainRandWrapper
,Serializable
Domain randomization wrapper which randomized the wrapped env using a buffer of domain parameter sets. At every call of the reset method this wrapper cycles through that buffer.
Constructor
- Parameters:
wrapped_env – environment to wrap around
randomizer – DomainRandomizer object that manages the randomization. If None, the user has to set the buffer manually, the circular reset however works the same way
selection – method to draw samples from the buffer, either cyclic or random
- property buffer
Get the domain parameter buffer.
- fill_buffer(num_domains: int)[source]
Fill the internal buffer with domains.
- Parameters:
num_domains – number of randomized domain parameter sets to store in the buffer
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
- property ring_idx: int
Get the buffer’s index.
- property selection: str
Get the selection method.
- class DomainRandWrapperLive(wrapped_env: Union[SimEnv, EnvWrapper], randomizer: Optional[DomainRandomizer])[source]
Bases:
DomainRandWrapper
,Serializable
Domain randomization wrapper which randomized the wrapped env at every reset. Thus every rollout is done with different domain parameters.
Constructor
- Parameters:
wrapped_env – environment to wrap
randomizer – DomainRandomizer object holding the probability distribution of all randomizable domain parameters, pass None if you want to subclass wrapping another DomainRandWrapper and use its randomizer
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
- class MetaDomainRandWrapper(wrapped_rand_env: DomainRandWrapper, dp_mapping: Mapping[int, Tuple[str, str]])[source]
Bases:
DomainRandWrapper
,Serializable
Domain randomization wrapper which wraps another DomainRandWrapper to adapt its parameters, called domain distribution parameters.
Constructor
- Parameters:
wrapped_rand_env – randomized environment to wrap
dp_mapping – mapping from index of the numpy array (coming from the algorithm) to domain parameter name (e.g. mass, length) and the domain distribution parameter (e.g. mean, std)
# For the mapping arg use the this dict constructor ``` m = {0: ('name1', 'parameter_type1'), 1: ('name2', 'parameter_type2')} ```
- property randomizer: DomainRandomizer
Get the wrapped environment’s domain randomizer.
- remove_all_dr_wrappers(env: Env, verbose: bool = False)[source]
Go through the environment chain and remove all wrappers of type DomainRandWrapper (and subclasses).
- Parameters:
env – env chain with domain randomization wrappers
verbose – choose if status messages should be printed
- Returns:
env chain without domain randomization wrappers
downsampling
- class DownsamplingWrapper(wrapped_env: [<class 'pyrado.environments.real_base.RealEnv'>, <class 'pyrado.environment_wrappers.base.EnvWrapper'>], factor: int, obs_filter_fcn: callable = functools.partial(<function mean>, axis=0), init_obs: ~typing.Optional[~numpy.ndarray] = None)[source]
Bases:
EnvWrapperAct
,EnvWrapperObs
,Serializable
Environment wrapper which downsamples the actions coming from the rollout loop. This wrapper is intended to be used with the real Quanser devices, since these are set up to usually run on 500Hz, i.e. one send and receive every 0.002s. When learning in simulation, this requires a lot of samples per rollout, which makes learning more time-consuming and difficult (fine tuning the temporal discount factor). In order to be able to learn on a lower frequency, e.g. 100Hz, we downsample the actions passed to the real device, i.e. just send every factor action.
Note
The observations are not affected! Thus the policy still receives one observation per environment step, i.e. send and receive at the real device’s sampling frequency.
Constructor
- Parameters:
wrapped_env – environment to wrap around
factor – downsampling factor i.e. number of time steps for which every action should be repeated
obs_filter_fcn – function for processing the observations in the buffer, operates along 0-dimension
init_obs – initial observation to see the buffer, if None the buffer is initialized with zero arrays
- property factor
Get the downsampling factor.
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
observation_noise
- class GaussianObsNoiseWrapper(wrapped_env: Union[SimEnv, EnvWrapper], noise_std: Union[list, ndarray], noise_mean: Optional[Union[list, ndarray]] = None)[source]
Bases:
EnvWrapperObs
,Serializable
Environment wrapper which adds normally distributed i.i.d. noise to all observations.
- Parameters:
wrapped_env – environment to wrap
noise_std – list or numpy array for the standard deviation of the noise
noise_mean – list or numpy array for the mean of the noise, by default all zeros, i.e. no bias
observation_normalization
- class ObsNormWrapper(wrapped_env: Union[Env, EnvWrapper], explicit_lb: Optional[Mapping[str, float]] = None, explicit_ub: Optional[Mapping[str, float]] = None)[source]
Bases:
EnvWrapperObs
,Serializable
Environment wrapper which normalizes the observation space using the bounds from the environment or hard-coded bounds, such that all values are in range [-1, 1]
Constructor
- Parameters:
wrapped_env – environment to wrap
explicit_lb – dict to override the environment’s lower bound; by default (None) this is ignored; the keys are space labels, the values the new bound for that labeled entry
explicit_ub – dict to override the environment’s upper bound; by default (None) this is ignored; the keys are space labels, the values the new bound for that labeled entry
- static override_bounds(bounds: ndarray, override: Optional[Mapping[str, float]], names: ndarray) ndarray [source]
Override a given bound. This function is useful if some entries of the observation space have an infinite bound and/or you want to specify a certain bound
- Parameters:
bounds – bound to override
override – value to override with
names – label of the bound to override
- Returns:
new bound created from a copy of the old bound
- class ObsRunningNormWrapper(wrapped_env: Env)[source]
Bases:
EnvWrapperObs
,Serializable
Environment wrapper which normalizes the observation space using the bounds from the environment or hard-coded bounds, such that all values are in range [-1, 1]
Constructor
- Parameters:
wrapped_env – environment to wrap
observation_partial
- class ObsPartialWrapper(wrapped_env: Env, mask: Optional[list] = None, idcs: Optional[list] = None, keep_selected: bool = False)[source]
Bases:
EnvWrapperObs
,Serializable
Environment wrapper which creates a partial observation by masking certain elements
Constructor
- Parameters:
wrapped_env – environment to wrap
mask – mask out array, entries with 1 are dropped (behavior can be inverted by keep_selected=True)
idcs – indices to drop, ignored if mask is specified. If the observation space is labeled, the labels can be used as indices.
keep_selected – set to true to keep the mask entries with 1/the specified indices and drop the others
observation_velfilter
- class ObsVelFiltWrapper(wrapped_env: SimEnv, mask_pos: Optional[List] = None, idcs_pos: Optional[List] = None, mask_vel: Optional[List] = None, idcs_vel: Optional[List] = None, num: Optional[Tuple] = (50, 0), den: Optional[Tuple] = (1, 50))[source]
Bases:
EnvWrapperObs
,Serializable
Environment wrapper which computes the velocities from the satets given a linear filter
Constructor
- Parameters:
wrapped_env – environment to wrap, can only be used on SimEnv since access to the state is needed, and we don’t want to assume that all RealEnv can reconstruct this state from the observations. It wouldn’t make much sense to wrap a RealEnv with this wrapper anyway, since the goal it to mimic the behavior of the real environments velocity filter.
mask_pos – state mask array to select the position quantities in the state space, entries with 1 are kept
idcs_pos – state indices to select, ignored if mask is specified. If the state space is labeled, these labels can be used as indices.
mask_vel – observation mask array to select the velocity quantities in the observation space, entries with 1 are kept
idcs_vel – velocity observation indices to select, ignored if mask is specified. If the observation space is labeled, these labels can be used as indices.
num – continuous-time filter numerator
den – continuous-time filter denominator
- init_filter(init_state)[source]
Set the initial state of the velocity filter. This is useful when the initial (position) observation has been received and it is non-zero. Otherwise the filter would assume a very high initial velocity.
- Parameters:
init_state – initial state to set the filter
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
state_augmentation
- class StateAugmentationWrapper(wrapped_env: Env, domain_param: Optional[Sequence[str]] = None, fixed: bool = False)[source]
Bases:
EnvWrapper
,Serializable
Augments the observation of the wrapped environment by its physics configuration
Constructor
- Parameters:
wrapped_env – the environment to be wrapped
domain_param – list of domain parameter names to include in the observation, pass None to select all
fixed – fix the parameters
- property mask
- property nominal
- property obs_space
Get the space of the observations (agent’s perception of the environment).
- property offset
- property param
- reset(init_state: Optional[ndarray] = None, domain_param: Optional[dict] = None) ndarray [source]
Reset the environment to its initial state and optionally set different domain parameters.
- Parameters:
init_state – set explicit initial state if not None
domain_param – set explicit domain parameters if not None
- Return obs:
initial observation of the state.
- step(act: ndarray) tuple [source]
Perform one time step of the simulation. When a terminal condition is met, the reset function is called.
- Parameters:
act – action to be taken in the step
- Return tuple of obs, reward, done, and info:
obs : current observation of the environment reward: reward depending on the selected reward function done: indicates whether the episode has ended env_info: contains diagnostic information about the environment
utils
Some helpers for wrapped environment chains. A ‘chain’ consists of an environment and multiple EnvWrappers wrapping it. The real environment is always at the end of the chain. The modifying methods in this file assume that all EnvWrapper subclasses use Serializable properly, and that it’s ctor takes the wrapped environment as first positional parameter.
- all_envs(env)[source]
Iterates over the environment chain.
- Parameters:
env – outermost environment of the chain
- Returns:
an iterable over the whole chain from outermost to innermost
- attr_env(env, attr)[source]
Locate the first element in the chain that has an attribute of the given name. Returns None if not found.
- Parameters:
env – outermost environment of the chain
attr – attribute name to search
- Returns:
the first environment with the given attribute
- attr_env_get(env, attr)[source]
Locate the first element in the chain that has an attribute of the given name and return the value of the attribute. Returns None if not found.
- Parameters:
env – outermost environment of the chain
attr – attribute name to search
- Returns:
the value of the given attribute, taken from the first environment with the given attribute
- inner_env(env)[source]
Returns the innermost (a.k.a. non-wrapper) environment.
- Parameters:
env – outermost environment of the chain
- Returns:
the innermost environment of the chain
- insert_env_before(stack, key_type, insert_type, *args, **kwargs)[source]
Add an EnvWrapper of the given type right before key_type to the environment chain and return the modified chain. The original stack is unmodified, but untouched parts will be shared. If the key is not found, nothing will be done and the original chain is returned.
- Parameters:
stack – outermost environment of the chain
key_type – Type of environment to insert before. May be None to use the innermost environment
insert_type – type of environment to insert
args – additional args for insert_type’s ctor
kwargs – additional kwargs for insert_type’s ctor
- Returns:
the modified environment chain
- remove_env(stack, key_type)[source]
Remove an EnvWrapper of the given type from the environment chain and return the modified chain. The original stack is unmodified, but untouched parts will be shared. If the key is not found, nothing will be done and the original chain is returned.
- Parameters:
stack – outermost environment of the chain
key_type – type of environment to remove
- Returns:
the modified environment chain
- typed_env(env, tp)[source]
Locate the first element in the chain that is an instance of the given type. Returns None if not found.
- Parameters:
env – outermost environment of the chain
tp – the environment type to find, see isinstance for possible values.
- Returns:
the first environment with the given type