tasks
base
- class Task[source]
Bases:
ABC
Base class for all tasks in Pyrado. A task contains a desired state, a reward function, and a step function. The task also checks if the environment is done. Every environment should have exactly one task at a time.
- compute_final_rew(state: ndarray, remaining_steps: int) float [source]
Compute the final reward, e.g. bonus for success or a malus for failure, for a single task.
Note
This function should only be overwritten by tasks that manipulate the final reward.
- Parameters:
state – current state
remaining_steps – number of time steps left in the episode
- Returns:
final reward
- final_rew(state: ndarray, remaining_steps: int) float [source]
Get the final reward, e.g. bonus for success or a malus for failure. This function loops through all tasks (unfolding the wrappers) and calls their compute_final_rew method.
- Parameters:
state – current state forwarded to compute_final_rew
remaining_steps – number of time steps left in the episode forwarded to compute_final_rew
- Return rew:
summed final reward
- has_failed(state: ndarray) bool [source]
Check the environment if the agent failed. The default implementation checks if the state is out of bounds.
- Parameters:
state – environments current state
- Returns:
True if failed
- abstract has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- is_done(state: ndarray) bool [source]
Check if a final state is reached.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
- Return done:
done flag
- reset(**kwargs: Any)[source]
Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.
- Parameters:
kwargs – optional arguments e.g. environment specification or new desired state
- property space_des: Space
Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.
- property state_des: ndarray
Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.
- abstract step_rew(state: ndarray, act: ndarray, remaining_steps: int) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
- class TaskWrapper(wrapped_task: Task)[source]
Bases:
Task
Base for all task wrappers. Delegates all environment methods to the wrapped environment.
Constructor
- Parameters:
wrapped_task – task to wrap
- has_failed(state: ndarray) bool [source]
Check the environment if the agent failed. The default implementation checks if the state is out of bounds.
- Parameters:
state – environments current state
- Returns:
True if failed
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- is_done(state: ndarray) bool [source]
Check if a final state is reached.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
- Return done:
done flag
- reset(**kwargs)[source]
Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.
- Parameters:
kwargs – optional arguments e.g. environment specification or new desired state
- property space_des: Space
Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.
- property state_des: ndarray
Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.
- step_rew(state: ndarray, act: ndarray, remaining_steps: int) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
- property wrapped_task
condition_only
- class ConditionOnlyTask(env_spec: EnvSpec, condition_fcn: Callable, is_success_condition: bool)[source]
Bases:
Task
Task class which yields zero reward at every time and terminates if the given function is true. This class is intended to be wrapped by FinalRewTask.
Constructor
- Usage:
task = FinalRewTask( ConditionOnlyTask(<some EnvSpec>, <some Callable>, <True or False>), mode=FinalRewMode(time_dependent=True) )
- Parameters:
env_spec – environment specification of a simulated or real environment
condition_fcn – function to determine if the task was solved, by default (None) this task runs endlessly
is_success_condition – if True the condition_fcn returns True for a success, if False the condition_fcn returns True for a failure
- has_failed(state: ndarray) bool [source]
Check the environment if the agent failed. The default implementation checks if the state is out of bounds.
- Parameters:
state – environments current state
- Returns:
True if failed
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- reset(env_spec: EnvSpec, condition_fcn: Optional[Callable] = None, is_success_condition: Optional[bool] = None, **kwargs)[source]
Reset the task.
- Parameters:
env_spec – environment specification
condition_fcn – function to determine if the task was solved, by default (None) this task runs endlessly
is_success_condition – if True the condition_fcn returns True for a success, if False the condition_fcn returns True for a failure
- step_rew(state: Optional[ndarray] = None, act: Optional[ndarray] = None, remaining_steps: Optional[int] = None) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
desired_space
- class DesSpaceTask(env_spec: ~pyrado.utils.data_types.EnvSpec, space_des: ~pyrado.spaces.base.Space, rew_fcn: ~pyrado.tasks.reward_functions.RewFcn = <class 'pyrado.tasks.reward_functions.ZeroPerStepRewFcn'>)[source]
Bases:
Task
Task class for moving to a (fixed) desired state space. This task is designed with the idea in mind that it is only important if the state is in the desired (sub)space. If the state is in the desired space, the done flag is raised. Until then, the step reward is returned.
Constructor
- Parameters:
env_spec – environment specification of a simulated or real environment
space_des – desired state a.k.a. goal state
rew_fcn – reward function, an instance of a subclass of RewFcn
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- reset(env_spec: EnvSpec, space_des: Optional[Space] = None, **kwargs)[source]
Reset the task.
- Parameters:
env_spec – environment specification
space_des – new desired state a.k.a. goal state
kwargs – keyword arguments forwarded to the reward function, e.g. the initial state
- property space_des: Space
Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.
- step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
desired_state
- class DesStateTask(env_spec: EnvSpec, state_des: ndarray, rew_fcn: RewFcn, success_fcn: Optional[Callable] = None)[source]
Bases:
Task
Task class for moving to a desired state. Operates on the error in state and action.
Constructor
- Parameters:
env_spec – environment specification of a simulated or real environment
state_des – desired state a.k.a. goal state
rew_fcn – reward function, an instance of a subclass of RewFcn
success_fcn – function to determine if the task was solved, by default (None) this task runs endlessly
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- reset(env_spec: EnvSpec, state_des: Optional[ndarray] = None, **kwargs)[source]
Reset the task.
- Parameters:
env_spec – environment specification
state_des – new desired state a.k.a. goal state
kwargs – keyword arguments forwarded to the reward function, e.g. the initial state
- property state_des: ndarray
Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.
- step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
- class RadiallySymmDesStateTask(env_spec: ~pyrado.utils.data_types.EnvSpec, state_des: ~numpy.ndarray, rew_fcn: ~pyrado.tasks.reward_functions.RewFcn, idcs: ~typing.Sequence[int], modulation: [<class 'float'>, <class 'numpy.ndarray'>] = 6.283185307179586, success_fcn: ~typing.Optional[~typing.Callable] = None)[source]
Bases:
DesStateTask
Task class for moving to a desired state. Operates on the error in state and action. In contrast to DesStateTask, a subset of the state is radially symmetric, e.g. and angular position.
Constructor
- Parameters:
env_spec – environment specification of a simulated or real environment
state_des – desired state a.k.a. goal state
rew_fcn – reward function, an instance of a subclass of RewFcn
idcs – indices of the state dimension(s) to apply the modulation
modulation – factor for the modulo operation, can be specified separately for every of idcs
success_fcn – function to determine if the task was solved, by default (None) this task runs endlessly
- step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
endless_flipping
- class FlippingTask(env_spec: EnvSpec, des_angle_delta: float, rew_fcn: RewFcn, angle_tol: float = 0.017453292519943295, endless: bool = True)[source]
Bases:
Task
Task class for flipping an object around one axis about a desired angle. Once the new angle is equal to the old angle plus/minus a given angle delta, the new angle becomes the old one and the flipping continues.
Constructor
- Parameters:
env_spec – environment specification of a simulated or real environment
des_angle_delta – desired angle that counts as a flip
rew_fcn – reward function, an instance of a subclass of RewFcn
angle_tol – tolerance
endless – tolerance
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- reset(env_spec: EnvSpec, **kwargs)[source]
Reset the task.
- Parameters:
env_spec – environment specification
kwargs – keyword arguments forwarded to the reward function, e.g. the initial state
- step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
final_reward
- class BestStateFinalRewTask(wrapped_task: Task, factor: float)[source]
Bases:
TaskWrapper
Wrapper for tasks which yields a reward / cost on success / failure based on the best reward / cost observed in the current trajectory.
Constructor
- Parameters:
wrapped_task – task to wrap
factor – value to scale the final reward
- compute_final_rew(state: ndarray, remaining_steps: int) float [source]
Compute the reward / cost on task completion / fail of this task.
- Parameters:
state – current state of the environment
remaining_steps – number of time steps left in the episode
- Returns:
final reward of this task
- reset(**kwargs)[source]
Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.
- Parameters:
kwargs – optional arguments e.g. environment specification or new desired state
- step_rew(state: ndarray, act: ndarray, remaining_steps: int) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
- property yielded_final_rew: bool
Get the flag that signals if this instance already yielded its final reward.
- class FinalRewMode(state_dependent: bool = False, time_dependent: bool = False, always_positive: bool = False, always_negative: bool = False, user_input: bool = False)[source]
Bases:
tuple
The specification of how the final state should be rewarded or punished
Create new instance of FinalRewMode(state_dependent, time_dependent, always_positive, always_negative, user_input)
- property always_negative
Alias for field number 3
- property always_positive
Alias for field number 2
- property state_dependent
Alias for field number 0
- property time_dependent
Alias for field number 1
- property user_input
Alias for field number 4
- class FinalRewTask(wrapped_task: Task, mode: FinalRewMode, factor: float = 1000.0)[source]
Bases:
TaskWrapper
Wrapper for tasks which yields a reward / cost on success / failure
- Usage:
task = FinalRewTask(DesStateTask(spec, state_des, rew_fcn, success_fcn), mode=FinalRewMode(), factor=1e3)
Constructor
- Parameters:
wrapped_task – task to wrap
mode – mode for calculating the final reward
factor – (positive) value to scale the final reward. The factor is ignored if mode.time_dependent is True
- compute_final_rew(state: ndarray, remaining_steps: int) float [source]
Compute the reward / cost on task completion / fail of this task.
- Parameters:
state – current state of the environment
remaining_steps – number of time steps left in the episode
- Returns:
final reward of this task
- reset(**kwargs)[source]
Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.
- Parameters:
kwargs – optional arguments e.g. environment specification or new desired state
- property yielded_final_rew: bool
Get the flag that signals if this instance already yielded its final reward.
goalless
- class GoallessTask(env_spec: EnvSpec, rew_fcn: RewFcn)[source]
Bases:
Task
Task which has no desired state or desired space, this runs endlessly
Constructor
- Parameters:
env_spec – environment specification
rew_fcn – reward function, an instance of a subclass of RewFcn
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- reset(env_spec: EnvSpec, **kwargs)[source]
Reset the task.
- Parameters:
env_spec – environment specification
kwargs – keyword arguments forwarded to the reward function, e.g. the initial state
- step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
- class OptimProxyTask(env_spec: EnvSpec, rew_fcn: StateBasedRewFcn)[source]
Bases:
Task
Task for wrapping classical optimization problems a.k.a. (nonlinear) programming into Pyrado
Constructor
- Parameters:
env_spec – environment specification
rew_fcn – state-based reward function that maps the state to an scalar value
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- reset(env_spec, **kwargs)[source]
Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.
- Parameters:
kwargs – optional arguments e.g. environment specification or new desired state
- property rew_fcn: StateBasedRewFcn
Get the reward function.
- step_rew(state: ndarray, act: Optional[ndarray] = None, remaining_steps: Optional[int] = None) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
masked
- class MaskedTask(env_spec: EnvSpec, wrapped_task: Task, state_idcs: Union[str, int], action_idcs: Optional[Union[str, int]] = None)[source]
Bases:
Task
Task using only a subset of state and actions
Constructor
- Parameters:
env_spec – environment specification
wrapped_task – task for the selected part of the state-action space
state_idcs – indices of the selected states
action_idcs – indices of the selected actions
- final_rew(state: ndarray, remaining_steps: int) float [source]
Get the final reward, e.g. bonus for success or a malus for failure. This function loops through all tasks (unfolding the wrappers) and calls their compute_final_rew method.
- Parameters:
state – current state forwarded to compute_final_rew
remaining_steps – number of time steps left in the episode forwarded to compute_final_rew
- Return rew:
summed final reward
- has_failed(state: ndarray) bool [source]
Check the environment if the agent failed. The default implementation checks if the state is out of bounds.
- Parameters:
state – environments current state
- Returns:
True if failed
- has_succeeded(state: ndarray) bool [source]
Check the environment if the agent succeeded.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- is_done(state: ndarray) bool [source]
Check if a final state is reached.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
- Return done:
done flag
- reset(env_spec: EnvSpec, **kwargs)[source]
Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.
- Parameters:
kwargs – optional arguments e.g. environment specification or new desired state
- property state_des: ndarray
Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.
- step_rew(state: ndarray, act: ndarray, remaining_steps: int) float [source]
Get the step reward, e.g. from a function of the states and actions.
Note
It is strongly recommended to call this method every environment step.
- Parameters:
state – current state
act – current action
remaining_steps – number of time steps left in the episode
- Return rew:
current reward
parallel
- class ParallelTasks(tasks: Sequence[Task], hold_rew_when_done: bool = False, allow_failures: bool = False, easily_satisfied: bool = False, verbose: bool = False)[source]
Bases:
Task
Task class for a set of tasks a.k.a. goals which can be achieved in any order or parallel
Constructor
- Parameters:
tasks – sequence of tasks a.k.a. goals, the order matters
hold_rew_when_done – if True reward values for done tasks will be stored and added every step
allow_failures – if True this allows to continue after one sub-task failed, by default False
easily_satisfied – if True one successful subtask is enough to make the complete task successful, by default False. Use this wisely.
verbose – print messages on task completion
Note
This task can also be wrapped by a FinalRewTask to enjoy modularity.
hold_rew_when_done=True only makes sense for positive rewards.
- compute_final_rew(state: ndarray, remaining_steps: int) float [source]
Compute the reward / cost on task completion / fail of this task. Since this task holds multiple sub-tasks, the final reward / cost is computed for them, too.
Note
The ParallelTasks class is not a subclass of TaskWrapper, i.e. this function only looks at the immediate sub-tasks.
- Parameters:
state – current state of the environment
remaining_steps – number of time steps left in the episode
- Returns:
final reward of all sub-tasks
- has_succeeded(state: Optional[ndarray] = None) bool [source]
Check if this tasks is done. The ParallelTasks is successful if all sub-tasks are successful.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- property rew_fcn: list
Get a list of all reward functions.
- property space_des: list
Get a list of all desired spaces.
- property state_des: list
Get a list of all desired states.
predefined
- create_check_all_boundaries_task(env_spec: EnvSpec, penalty: float) FinalRewTask [source]
Create a task that is checking if any of the state space bounds is violated. This checks every limit and not just of a subspace of the state state as it could happen when using a MaskedTask.
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
penalty – scalar cost (positive values) for violating the bounds
- Returns:
masked task that only considers a subspace of all observations
- create_collision_task(env_spec: EnvSpec, factor: float) MaskedTask [source]
Create a task which punishes collision costs given a collision model with pairs of bodies. This task only looks at the instantaneous collision cost.
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
factor – cost / reward function scaling factor
- Returns:
masked task that only considers a subspace of all observations
- create_flipping_task(env_spec: EnvSpec, obs_labels: Sequence[str], des_angle_delta: float = 1.5707963267948966, endless: bool = True) MaskedTask [source]
Create a task for rotating an object.
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
obs_labels – labels for selection, e.g. [‘Box_A’]. This needs to match the observations’ names in RcsPySim
des_angle_delta – desired angle to rotate. If reached, the task is reset, and rotating continues.
endless – if True, the task will promote endlessly repeated flipping about the desired angle, else only one flip is desired
- Returns:
masked task that only considers a subspace of all observations
- create_forcemin_task(env_spec: EnvSpec, obs_labels: Sequence[str], Q: ndarray) MaskedTask [source]
Create a task which punishes the amount of used force.
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
obs_labels – labels for selection, e.g. [‘WristLoadCellLBR_R_Fy’]. This needs to match the observations’ names in RcsPySim
Q – weight matrix of dim NxN with N=num_forces for the quadratic force costs
- Returns:
masked task that only considers a subspace of all observations
- create_goal_dist_distvel_task(env_spec: EnvSpec, ds_label: int, rew_fcn: RewFcn, succ_thold: float = 0.01) MaskedTask [source]
Create a task that rewards minimizing the GoalDistance as well as its rate of change of dynamical system movements primitives (see RcsPySim).
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
ds_label – label of the dynamical system (see RcsPySim)
rew_fcn – reward function
succ_thold – once the object of interest is closer than this threshold, the task is considered successfully
- Returns:
masked task that only considers a subspace of all observations
- create_goal_dist_task(env_spec: EnvSpec, ds_label: int, rew_fcn: RewFcn, succ_thold: float = 0.01) MaskedTask [source]
Create a task that rewards minimizing the GoalDistance of dynamical system movements primitives (see RcsPySim).
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
ds_label – label of the dynamical system (see RcsPySim)
rew_fcn – reward function
succ_thold – once the object of interest is closer than this threshold, the task is considered successfully
- Returns:
masked task that only considers a subspace of all observations
- create_home_pos_task(env_spec: EnvSpec, obs_labels: Sequence[str], state_des: ndarray) MaskedTask [source]
Create a task for moving the robot to safe position.
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
obs_labels – labels for selection, e.g. [‘PowerGrasp_R_Y’, ‘PowerGrasp_R_Z’]. This needs to match the observations’ names in RcsPySim
state_des – desired state (depends of the coordinate system). If reached, the task is over.
- Returns:
masked task that only considers a subspace of all observations
- create_lifting_task(env_spec: EnvSpec, obs_labels: Sequence[str], des_height: Union[float, ndarray], succ_thold: float = 0.01) MaskedTask [source]
Create a task for lifting an object.
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
obs_labels – labels for selection, e.g. [‘Box_Z’]. This needs to match the observations’ names in RcsPySim
des_height – desired height of the object (depends of the coordinate system). If reached, the task is over.
succ_thold – once the object of interest is closer than this threshold, the task is considered successfully
- Returns:
masked task that only considers a subspace of all observations
- create_task_space_discrepancy_task(env_spec: EnvSpec, rew_fcn: RewFcn) MaskedTask [source]
Create a task which punishes the discrepancy between the actual and the commanded state of the observed body. The observed body is specified in in the associated experiment configuration file in RcsPySim. This task only looks at the X and Z coordinates.
Note
This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.
- Parameters:
env_spec – environment specification
rew_fcn – reward function
- Returns:
masked task that only considers a subspace of all observations
reward_functions
- class AbsErrRewFcn(q: ndarray, r: ndarray)[source]
Bases:
RewFcn
Reward function that returns the negative weighted sum of the absolute errors.
Constructor
- Parameters:
q – weight vector for the state errors
r – weight vector for the action errors
- class CompoundRewFcn(rew_fcns: Sequence)[source]
Bases:
RewFcn
Combine multiple reward functions
Constuctor
- Parameters:
rew_fcns – sequence, e.g. list or tuple, of reward functions to combine
- class CosOfOneEleRewFcn(idx: int)[source]
Bases:
RewFcn
Reward function that takes the cosine of one element of the state, given by an index. Maximum reward of +1 at state[idx] = +/- state_des[idx], minimum reward of -1 at state[idx] = 0.
Constructor
- Parameters:
idx – index of the element (angle) of interest
- class ExpQuadrErrRewFcn(Q: Union[ndarray, list], R: Union[ndarray, list])[source]
Bases:
QuadrErrRewFcn
Reward function that returns the exp of the weighted sum of squared errors
Constructor
- Parameters:
Q – weight matrix for the state errors (positive semi-definite)
R – weight matrix for the action errors (positive definite)
- class ForwardVelocityRewFcn(dt: float, idx_fwd: int, fwd_rew_weight: float, ctrl_cost_weight: float)[source]
Bases:
RewFcn
Reward function for the HalfCheetahSim and SwimmerSim environment, encouraging to run forward
Note
The OpenAi Gym calculates the velocity via forward differences, while here we get the velocity directly from the simulator.
See also
https://github.com/openai/gym/blob/master/gym/envs/mujoco/half_cheetah.py https://github.com/openai/gym/blob/master/gym/envs/mujoco/swimmer_v3.py
Constructor
Note
The last x position, which is rewarded, is initialized by reset(), since the (sampled) initial state is unknown at construction time of the task, i.e. this reward function.
- Parameters:
dt – simulation step size [s]
idx_fwd – index of the state dimension that marks the forward direction
fwd_rew_weight – scaling factor for the forward velocity reward
ctrl_cost_weight – scaling factor for the control cost
- class ForwardVelocityRewFcnAnt(dt: float, contact_force_range: Tuple[float], contact_cost_weight: float = 0.0005, ctrl_cost_weight: float = 0.5, healthy_reward: float = 1.0, terminate_when_unhealthy: bool = True, healthy_z_range: Tuple[float] = (0.2, 1.0))[source]
Bases:
RewFcn
Reward function for the AntSim environment, encouraging to run forward
Note
The OpenAi Gym calculates the velocity via forward differences, while here we get the velocity directly from the simulator.
Constructor
Note
The last x position, which is rewarded, is initialized by reset(), since the (sampled) initial state is unknown at construction time of the task, i.e. this reward function.
- Parameters:
dt – simulation step size [s]
idx_fwd – index of the state dimension that marks the forward direction
fwd_rew_weight – scaling factor for the forward velocity reward
ctrl_cost_weight – scaling factor for the control cost
- property healthy_reward
- property is_healthy: bool
- class ForwardVelocityRewFcnHumanoid(dt: float, contact_cost_weight: float = 5e-07, ctrl_cost_weight: float = 0.1, forward_reward_weight: float = 1.25, healthy_reward: float = 5.0, terminate_when_unhealthy: bool = True, healthy_z_range: Tuple[float] = (1.0, 2.0), contact_cost_range: Tuple[float] = (-inf, 10.0))[source]
Bases:
RewFcn
Reward function for the HumanoidSim environment, encouraging to run forward
Note
The OpenAi Gym calculates the velocity via forward differences, while here we get the velocity directly from the simulator.
Constructor
Note
The last x position, which is rewarded, is initialized by reset(), since the (sampled) initial state is unknown at construction time of the task, i.e. this reward function.
- Parameters:
dt – simulation step size [s]
idx_fwd – index of the state dimension that marks the forward direction
fwd_rew_weight – scaling factor for the forward velocity reward
ctrl_cost_weight – scaling factor for the control cost
- property healthy_reward
- property is_healthy: bool
- class MinusOnePerStepRewFcn[source]
Bases:
RewFcn
Reward function that yields -1 reward every time step. A positive or negative final reward can be specified on the Task-level.
- class PlusOnePerStepRewFcn[source]
Bases:
RewFcn
Reward function that yields +1 reward every time step. A positive or negative final reward can be specified on the Task-level.
- class QCartPoleSwingUpRewFcn(factor: float = 0.9)[source]
Bases:
RewFcn
Custom reward function for QCartPoleSwingUpSim.
Constructor
- Parameters:
factor – weighting factor of rotation error to position error
- class QuadrErrRewFcn(Q: Union[ndarray, list], R: Union[ndarray, list])[source]
Bases:
RewFcn
Reward function that returns the exp of the weighted sum of squared errors.
Constructor
- Parameters:
Q – weight matrix for the state errors (positive semi-definite)
R – weight matrix for the action errors (positive definite)
- class SSCosOfOneEleRewFcn(idx: int)[source]
Bases:
CosOfOneEleRewFcn
Reward function that takes the shifted and scaled cosine of one element of the state, given by an index. Maximum reward of +1 at state[idx] = +/- state_des[idx], minimum reward of 0 at state[idx] = 0.
Constructor
- Parameters:
idx – index of the element (angle) of interest
- class ScaledExpQuadrErrRewFcn(Q: [<class 'numpy.ndarray'>, <class 'list'>], R: [<class 'numpy.ndarray'>, <class 'list'>], state_space: ~pyrado.spaces.base.Space, act_space: ~pyrado.spaces.base.Space, min_rew: float = 0.0001)[source]
Bases:
QuadrErrRewFcn
Reward function that returns the exp of the scaled weighted sum of squared errors
Constructor
Note
This reward function type depends on environment specific parameters. Due to the domain randomization, have to re-init the reward function after every randomization of the env, since obs_max and act_max can change when randomizing the domain parameters.
- Parameters:
Q – weight matrix for the state errors (positive semi-definite)
R – weight matrix for the action errors (positive definite)
state_space – for extracting the worst case (max cost) observation
act_space – for extracting the worst case (max cost) action
min_rew – minimum reward (only used for the scaling factor in the exponential reward function)
- class StateBasedRewFcn(fcn: Callable[[ndarray], float], flip_sign: bool = False)[source]
Bases:
object
Reward function which directly operates on the state a.k.a. solution. This class is supposed to be used for wrapping classical optimization problems into Pyrado, thus it is negative of the loss function.
Constructor
- Parameters:
fcn – function for evaluating the state a.k.a. solution
flip_sign – return negative of fcn, useful to turn minimization problems into maximization problems
- class UnderActuatedSwingUpRewFcn(c_pole: float = 1.0, c_cart: float = 0.2, c_act: float = 0.001, c_theta_sq: float = 1.0, c_theta_log: float = 0.1, idx_x: int = 0, idx_th: int = 1)[source]
Bases:
RewFcn
Reward function for the swing-up task on the Cart-Pole system similar to [1].
See also
- [1] W. Yu, J. Tan, C.K. Liu, G. Turk, “Preparing for the Unknown: Learning a Universal Policy with Online System
Identification”, RSS, 2017
Constructor
- Parameters:
c_pole – scaling parameter for the pole angle cost
c_cart – scaling parameter for the cart position cost
c_act – scaling parameter for the control cost
c_theta_sq – scaling parameter for the quadratic angle deviation
c_theta_log – shifting parameter for the logarithm of the quadratic angle deviation
idx_x – index of he state representing the driving component of the system (e.g. cart position x)
idx_th – index of he state representing the rotating of the system (e.g. pole angle theta)
sequential
- class SequentialTasks(tasks: Sequence[Task], start_idx: int = 0, hold_rew_when_done: bool = False, verbose: bool = False)[source]
Bases:
Task
Task class for a sequence of tasks a.k.a. goals
Constructor
- Parameters:
tasks – sequence of tasks a.k.a. goals, the order matters
start_idx – index of the task to start with, by default with the first one in the list
hold_rew_when_done – if True reward values for done tasks will be stored and added every step
verbose – print messages on task completion
Note
hold_rew_when_done=True only makes sense for positive rewards.
- compute_final_rew(state: ndarray, remaining_steps: int) float [source]
Compute the reward / cost on task completion / fail of this task. Since this task holds multiple sub-tasks, the final reward / cost is computed for them, too.
Note
The ParallelTasks class is not a subclass of TaskWrapper, i.e. this function only looks at the immediate sub-tasks.
- Parameters:
state – current state of the environment
remaining_steps – number of time steps left in the episode
- Returns:
final reward of all sub-tasks
- has_succeeded(state: ndarray) bool [source]
Check if this tasks is done. The SequentialTasks is successful if all sub-tasks are successful.
- Parameters:
state – environments current state
- Returns:
True if succeeded
- property idx_curr: int
Get the index of the currently active task.
- property state_des: ndarray
Get the desired state the current task.
utils
- never_succeeded(err: Optional[ndarray] = None) bool [source]
The task is never marked successful, i.e. runs until the environments state or time is out of bounds.
- proximity_succeeded(err: ndarray, thold_dist: float, dims: int = -1) bool [source]
The task is done successfully if the L2-norm of the selected dimensions of the state is smaller than some threshold.
- Parameters:
err – error in state, i.e. difference between desired and current state
thold_dist – threshold for being solved
dims – selected dimensions, i.e. indices, for the state, by default all dimensions are selected
- Returns:
True if successful