tasks

base

class Task[source]

Bases: ABC

Base class for all tasks in Pyrado. A task contains a desired state, a reward function, and a step function. The task also checks if the environment is done. Every environment should have exactly one task at a time.

compute_final_rew(state: ndarray, remaining_steps: int) float[source]

Compute the final reward, e.g. bonus for success or a malus for failure, for a single task.

Note

This function should only be overwritten by tasks that manipulate the final reward.

Parameters:
  • state – current state

  • remaining_steps – number of time steps left in the episode

Returns:

final reward

property env_spec: EnvSpec

Get the specification of environment the task is in.

final_rew(state: ndarray, remaining_steps: int) float[source]

Get the final reward, e.g. bonus for success or a malus for failure. This function loops through all tasks (unfolding the wrappers) and calls their compute_final_rew method.

Parameters:
  • state – current state forwarded to compute_final_rew

  • remaining_steps – number of time steps left in the episode forwarded to compute_final_rew

Return rew:

summed final reward

has_failed(state: ndarray) bool[source]

Check the environment if the agent failed. The default implementation checks if the state is out of bounds.

Parameters:

state – environments current state

Returns:

True if failed

abstract has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

is_done(state: ndarray) bool[source]

Check if a final state is reached.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

Return done:

done flag

reset(**kwargs: Any)[source]

Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.

Parameters:

kwargs – optional arguments e.g. environment specification or new desired state

abstract property rew_fcn: RewFcn

Get the reward function.

property space_des: Space

Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.

property state_des: ndarray

Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.

abstract step_rew(state: ndarray, act: ndarray, remaining_steps: int) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

class TaskWrapper(wrapped_task: Task)[source]

Bases: Task

Base for all task wrappers. Delegates all environment methods to the wrapped environment.

Constructor

Parameters:

wrapped_task – task to wrap

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_failed(state: ndarray) bool[source]

Check the environment if the agent failed. The default implementation checks if the state is out of bounds.

Parameters:

state – environments current state

Returns:

True if failed

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

is_done(state: ndarray) bool[source]

Check if a final state is reached.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

Return done:

done flag

reset(**kwargs)[source]

Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.

Parameters:

kwargs – optional arguments e.g. environment specification or new desired state

property rew_fcn: RewFcn

Get the reward function.

property space_des: Space

Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.

property state_des: ndarray

Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.

step_rew(state: ndarray, act: ndarray, remaining_steps: int) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

property wrapped_task
all_tasks(task)[source]

Iterates over the task chain.

Parameters:

task – outermost task of the chain

Returns:

an iterable over the whole chain from outermost to innermost

condition_only

class ConditionOnlyTask(env_spec: EnvSpec, condition_fcn: Callable, is_success_condition: bool)[source]

Bases: Task

Task class which yields zero reward at every time and terminates if the given function is true. This class is intended to be wrapped by FinalRewTask.

Constructor

Usage:

task = FinalRewTask(
           ConditionOnlyTask(<some EnvSpec>, <some Callable>, <True or False>),
           mode=FinalRewMode(time_dependent=True)
)
Parameters:
  • env_spec – environment specification of a simulated or real environment

  • condition_fcn – function to determine if the task was solved, by default (None) this task runs endlessly

  • is_success_condition – if True the condition_fcn returns True for a success, if False the condition_fcn returns True for a failure

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_failed(state: ndarray) bool[source]

Check the environment if the agent failed. The default implementation checks if the state is out of bounds.

Parameters:

state – environments current state

Returns:

True if failed

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

reset(env_spec: EnvSpec, condition_fcn: Optional[Callable] = None, is_success_condition: Optional[bool] = None, **kwargs)[source]

Reset the task.

Parameters:
  • env_spec – environment specification

  • condition_fcn – function to determine if the task was solved, by default (None) this task runs endlessly

  • is_success_condition – if True the condition_fcn returns True for a success, if False the condition_fcn returns True for a failure

property rew_fcn: RewFcn

Get the reward function.

step_rew(state: Optional[ndarray] = None, act: Optional[ndarray] = None, remaining_steps: Optional[int] = None) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

desired_space

class DesSpaceTask(env_spec: ~pyrado.utils.data_types.EnvSpec, space_des: ~pyrado.spaces.base.Space, rew_fcn: ~pyrado.tasks.reward_functions.RewFcn = <class 'pyrado.tasks.reward_functions.ZeroPerStepRewFcn'>)[source]

Bases: Task

Task class for moving to a (fixed) desired state space. This task is designed with the idea in mind that it is only important if the state is in the desired (sub)space. If the state is in the desired space, the done flag is raised. Until then, the step reward is returned.

Constructor

Parameters:
  • env_spec – environment specification of a simulated or real environment

  • space_des – desired state a.k.a. goal state

  • rew_fcn – reward function, an instance of a subclass of RewFcn

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

reset(env_spec: EnvSpec, space_des: Optional[Space] = None, **kwargs)[source]

Reset the task.

Parameters:
  • env_spec – environment specification

  • space_des – new desired state a.k.a. goal state

  • kwargs – keyword arguments forwarded to the reward function, e.g. the initial state

property rew_fcn: RewFcn

Get the reward function.

property space_des: Space

Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.

step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

desired_state

class DesStateTask(env_spec: EnvSpec, state_des: ndarray, rew_fcn: RewFcn, success_fcn: Optional[Callable] = None)[source]

Bases: Task

Task class for moving to a desired state. Operates on the error in state and action.

Constructor

Parameters:
  • env_spec – environment specification of a simulated or real environment

  • state_des – desired state a.k.a. goal state

  • rew_fcn – reward function, an instance of a subclass of RewFcn

  • success_fcn – function to determine if the task was solved, by default (None) this task runs endlessly

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

reset(env_spec: EnvSpec, state_des: Optional[ndarray] = None, **kwargs)[source]

Reset the task.

Parameters:
  • env_spec – environment specification

  • state_des – new desired state a.k.a. goal state

  • kwargs – keyword arguments forwarded to the reward function, e.g. the initial state

property rew_fcn: RewFcn

Get the reward function.

property state_des: ndarray

Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.

step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

class RadiallySymmDesStateTask(env_spec: ~pyrado.utils.data_types.EnvSpec, state_des: ~numpy.ndarray, rew_fcn: ~pyrado.tasks.reward_functions.RewFcn, idcs: ~typing.Sequence[int], modulation: [<class 'float'>, <class 'numpy.ndarray'>] = 6.283185307179586, success_fcn: ~typing.Optional[~typing.Callable] = None)[source]

Bases: DesStateTask

Task class for moving to a desired state. Operates on the error in state and action. In contrast to DesStateTask, a subset of the state is radially symmetric, e.g. and angular position.

Constructor

Parameters:
  • env_spec – environment specification of a simulated or real environment

  • state_des – desired state a.k.a. goal state

  • rew_fcn – reward function, an instance of a subclass of RewFcn

  • idcs – indices of the state dimension(s) to apply the modulation

  • modulation – factor for the modulo operation, can be specified separately for every of idcs

  • success_fcn – function to determine if the task was solved, by default (None) this task runs endlessly

step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

endless_flipping

class FlippingTask(env_spec: EnvSpec, des_angle_delta: float, rew_fcn: RewFcn, angle_tol: float = 0.017453292519943295, endless: bool = True)[source]

Bases: Task

Task class for flipping an object around one axis about a desired angle. Once the new angle is equal to the old angle plus/minus a given angle delta, the new angle becomes the old one and the flipping continues.

Constructor

Parameters:
  • env_spec – environment specification of a simulated or real environment

  • des_angle_delta – desired angle that counts as a flip

  • rew_fcn – reward function, an instance of a subclass of RewFcn

  • angle_tol – tolerance

  • endless – tolerance

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

reset(env_spec: EnvSpec, **kwargs)[source]

Reset the task.

Parameters:
  • env_spec – environment specification

  • kwargs – keyword arguments forwarded to the reward function, e.g. the initial state

property rew_fcn: RewFcn

Get the reward function.

step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

final_reward

class BestStateFinalRewTask(wrapped_task: Task, factor: float)[source]

Bases: TaskWrapper

Wrapper for tasks which yields a reward / cost on success / failure based on the best reward / cost observed in the current trajectory.

Constructor

Parameters:
  • wrapped_task – task to wrap

  • factor – value to scale the final reward

compute_final_rew(state: ndarray, remaining_steps: int) float[source]

Compute the reward / cost on task completion / fail of this task.

Parameters:
  • state – current state of the environment

  • remaining_steps – number of time steps left in the episode

Returns:

final reward of this task

reset(**kwargs)[source]

Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.

Parameters:

kwargs – optional arguments e.g. environment specification or new desired state

step_rew(state: ndarray, act: ndarray, remaining_steps: int) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

property yielded_final_rew: bool

Get the flag that signals if this instance already yielded its final reward.

class FinalRewMode(state_dependent: bool = False, time_dependent: bool = False, always_positive: bool = False, always_negative: bool = False, user_input: bool = False)[source]

Bases: tuple

The specification of how the final state should be rewarded or punished

Create new instance of FinalRewMode(state_dependent, time_dependent, always_positive, always_negative, user_input)

property always_negative

Alias for field number 3

property always_positive

Alias for field number 2

property state_dependent

Alias for field number 0

property time_dependent

Alias for field number 1

property user_input

Alias for field number 4

class FinalRewTask(wrapped_task: Task, mode: FinalRewMode, factor: float = 1000.0)[source]

Bases: TaskWrapper

Wrapper for tasks which yields a reward / cost on success / failure

Usage:

task = FinalRewTask(DesStateTask(spec, state_des, rew_fcn, success_fcn), mode=FinalRewMode(), factor=1e3)

Constructor

Parameters:
  • wrapped_task – task to wrap

  • mode – mode for calculating the final reward

  • factor – (positive) value to scale the final reward. The factor is ignored if mode.time_dependent is True

compute_final_rew(state: ndarray, remaining_steps: int) float[source]

Compute the reward / cost on task completion / fail of this task.

Parameters:
  • state – current state of the environment

  • remaining_steps – number of time steps left in the episode

Returns:

final reward of this task

reset(**kwargs)[source]

Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.

Parameters:

kwargs – optional arguments e.g. environment specification or new desired state

property yielded_final_rew: bool

Get the flag that signals if this instance already yielded its final reward.

goalless

class GoallessTask(env_spec: EnvSpec, rew_fcn: RewFcn)[source]

Bases: Task

Task which has no desired state or desired space, this runs endlessly

Constructor

Parameters:
  • env_spec – environment specification

  • rew_fcn – reward function, an instance of a subclass of RewFcn

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

reset(env_spec: EnvSpec, **kwargs)[source]

Reset the task.

Parameters:
  • env_spec – environment specification

  • kwargs – keyword arguments forwarded to the reward function, e.g. the initial state

property rew_fcn: RewFcn

Get the reward function.

step_rew(state: ndarray, act: ndarray, remaining_steps: Optional[int] = None) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

class OptimProxyTask(env_spec: EnvSpec, rew_fcn: StateBasedRewFcn)[source]

Bases: Task

Task for wrapping classical optimization problems a.k.a. (nonlinear) programming into Pyrado

Constructor

Parameters:
  • env_spec – environment specification

  • rew_fcn – state-based reward function that maps the state to an scalar value

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

reset(env_spec, **kwargs)[source]

Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.

Parameters:

kwargs – optional arguments e.g. environment specification or new desired state

property rew_fcn: StateBasedRewFcn

Get the reward function.

step_rew(state: ndarray, act: Optional[ndarray] = None, remaining_steps: Optional[int] = None) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

masked

class MaskedTask(env_spec: EnvSpec, wrapped_task: Task, state_idcs: Union[str, int], action_idcs: Optional[Union[str, int]] = None)[source]

Bases: Task

Task using only a subset of state and actions

Constructor

Parameters:
  • env_spec – environment specification

  • wrapped_task – task for the selected part of the state-action space

  • state_idcs – indices of the selected states

  • action_idcs – indices of the selected actions

property env_spec: EnvSpec

Get the specification of environment the task is in.

final_rew(state: ndarray, remaining_steps: int) float[source]

Get the final reward, e.g. bonus for success or a malus for failure. This function loops through all tasks (unfolding the wrappers) and calls their compute_final_rew method.

Parameters:
  • state – current state forwarded to compute_final_rew

  • remaining_steps – number of time steps left in the episode forwarded to compute_final_rew

Return rew:

summed final reward

has_failed(state: ndarray) bool[source]

Check the environment if the agent failed. The default implementation checks if the state is out of bounds.

Parameters:

state – environments current state

Returns:

True if failed

has_succeeded(state: ndarray) bool[source]

Check the environment if the agent succeeded.

Parameters:

state – environments current state

Returns:

True if succeeded

is_done(state: ndarray) bool[source]

Check if a final state is reached.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

Return done:

done flag

reset(env_spec: EnvSpec, **kwargs)[source]

Reset the task. Since the environment specification may change at every reset of the environment, we have to reset the task. This might also include resetting the members of the reward function if there are any.

Parameters:

kwargs – optional arguments e.g. environment specification or new desired state

property rew_fcn: RewFcn

Get the reward function.

property state_des: ndarray

Get the desired state (same dimensions as the environment’s state). Only override this if the task has a desired state.

step_rew(state: ndarray, act: ndarray, remaining_steps: int) float[source]

Get the step reward, e.g. from a function of the states and actions.

Note

It is strongly recommended to call this method every environment step.

Parameters:
  • state – current state

  • act – current action

  • remaining_steps – number of time steps left in the episode

Return rew:

current reward

property wrapped_task: Task

parallel

class ParallelTasks(tasks: Sequence[Task], hold_rew_when_done: bool = False, allow_failures: bool = False, easily_satisfied: bool = False, verbose: bool = False)[source]

Bases: Task

Task class for a set of tasks a.k.a. goals which can be achieved in any order or parallel

Constructor

Parameters:
  • tasks – sequence of tasks a.k.a. goals, the order matters

  • hold_rew_when_done – if True reward values for done tasks will be stored and added every step

  • allow_failures – if True this allows to continue after one sub-task failed, by default False

  • easily_satisfied – if True one successful subtask is enough to make the complete task successful, by default False. Use this wisely.

  • verbose – print messages on task completion

Note

This task can also be wrapped by a FinalRewTask to enjoy modularity.

hold_rew_when_done=True only makes sense for positive rewards.

compute_final_rew(state: ndarray, remaining_steps: int) float[source]

Compute the reward / cost on task completion / fail of this task. Since this task holds multiple sub-tasks, the final reward / cost is computed for them, too.

Note

The ParallelTasks class is not a subclass of TaskWrapper, i.e. this function only looks at the immediate sub-tasks.

Parameters:
  • state – current state of the environment

  • remaining_steps – number of time steps left in the episode

Returns:

final reward of all sub-tasks

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_failed(state: Optional[ndarray] = None) bool[source]

Check if this task has failed.

has_succeeded(state: Optional[ndarray] = None) bool[source]

Check if this tasks is done. The ParallelTasks is successful if all sub-tasks are successful.

Parameters:

state – environments current state

Returns:

True if succeeded

reset(**kwargs)[source]

Reset all tasks.

property rew_fcn: list

Get a list of all reward functions.

property space_des: list

Get a list of all desired spaces.

property state_des: list

Get a list of all desired states.

step_rew(state: ndarray, act: ndarray, remaining_steps: int) float[source]

Get the step reward accumulated from every non-done task.

property tasks: Sequence[Task]

Get the list of tasks.

predefined

create_check_all_boundaries_task(env_spec: EnvSpec, penalty: float) FinalRewTask[source]

Create a task that is checking if any of the state space bounds is violated. This checks every limit and not just of a subspace of the state state as it could happen when using a MaskedTask.

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • penalty – scalar cost (positive values) for violating the bounds

Returns:

masked task that only considers a subspace of all observations

create_collision_task(env_spec: EnvSpec, factor: float) MaskedTask[source]

Create a task which punishes collision costs given a collision model with pairs of bodies. This task only looks at the instantaneous collision cost.

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • factor – cost / reward function scaling factor

Returns:

masked task that only considers a subspace of all observations

create_flipping_task(env_spec: EnvSpec, obs_labels: Sequence[str], des_angle_delta: float = 1.5707963267948966, endless: bool = True) MaskedTask[source]

Create a task for rotating an object.

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • obs_labels – labels for selection, e.g. [‘Box_A’]. This needs to match the observations’ names in RcsPySim

  • des_angle_delta – desired angle to rotate. If reached, the task is reset, and rotating continues.

  • endless – if True, the task will promote endlessly repeated flipping about the desired angle, else only one flip is desired

Returns:

masked task that only considers a subspace of all observations

create_forcemin_task(env_spec: EnvSpec, obs_labels: Sequence[str], Q: ndarray) MaskedTask[source]

Create a task which punishes the amount of used force.

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • obs_labels – labels for selection, e.g. [‘WristLoadCellLBR_R_Fy’]. This needs to match the observations’ names in RcsPySim

  • Q – weight matrix of dim NxN with N=num_forces for the quadratic force costs

Returns:

masked task that only considers a subspace of all observations

create_goal_dist_distvel_task(env_spec: EnvSpec, ds_label: int, rew_fcn: RewFcn, succ_thold: float = 0.01) MaskedTask[source]

Create a task that rewards minimizing the GoalDistance as well as its rate of change of dynamical system movements primitives (see RcsPySim).

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • ds_label – label of the dynamical system (see RcsPySim)

  • rew_fcn – reward function

  • succ_thold – once the object of interest is closer than this threshold, the task is considered successfully

Returns:

masked task that only considers a subspace of all observations

create_goal_dist_task(env_spec: EnvSpec, ds_label: int, rew_fcn: RewFcn, succ_thold: float = 0.01) MaskedTask[source]

Create a task that rewards minimizing the GoalDistance of dynamical system movements primitives (see RcsPySim).

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • ds_label – label of the dynamical system (see RcsPySim)

  • rew_fcn – reward function

  • succ_thold – once the object of interest is closer than this threshold, the task is considered successfully

Returns:

masked task that only considers a subspace of all observations

create_home_pos_task(env_spec: EnvSpec, obs_labels: Sequence[str], state_des: ndarray) MaskedTask[source]

Create a task for moving the robot to safe position.

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • obs_labels – labels for selection, e.g. [‘PowerGrasp_R_Y’, ‘PowerGrasp_R_Z’]. This needs to match the observations’ names in RcsPySim

  • state_des – desired state (depends of the coordinate system). If reached, the task is over.

Returns:

masked task that only considers a subspace of all observations

create_lifting_task(env_spec: EnvSpec, obs_labels: Sequence[str], des_height: Union[float, ndarray], succ_thold: float = 0.01) MaskedTask[source]

Create a task for lifting an object.

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • obs_labels – labels for selection, e.g. [‘Box_Z’]. This needs to match the observations’ names in RcsPySim

  • des_height – desired height of the object (depends of the coordinate system). If reached, the task is over.

  • succ_thold – once the object of interest is closer than this threshold, the task is considered successfully

Returns:

masked task that only considers a subspace of all observations

create_task_space_discrepancy_task(env_spec: EnvSpec, rew_fcn: RewFcn) MaskedTask[source]

Create a task which punishes the discrepancy between the actual and the commanded state of the observed body. The observed body is specified in in the associated experiment configuration file in RcsPySim. This task only looks at the X and Z coordinates.

Note

This task was designed with an RcsPySim environment in mind, but is not restricted to these environments.

Parameters:
  • env_spec – environment specification

  • rew_fcn – reward function

Returns:

masked task that only considers a subspace of all observations

reward_functions

class AbsErrRewFcn(q: ndarray, r: ndarray)[source]

Bases: RewFcn

Reward function that returns the negative weighted sum of the absolute errors.

Constructor

Parameters:
  • q – weight vector for the state errors

  • r – weight vector for the action errors

class CompoundRewFcn(rew_fcns: Sequence)[source]

Bases: RewFcn

Combine multiple reward functions

Constuctor

Parameters:

rew_fcns – sequence, e.g. list or tuple, of reward functions to combine

reset(*args, **kwargs)[source]

Reset internal members. This function is called from the Task.reset() function. The default implementation does nothing.

class CosOfOneEleRewFcn(idx: int)[source]

Bases: RewFcn

Reward function that takes the cosine of one element of the state, given by an index. Maximum reward of +1 at state[idx] = +/- state_des[idx], minimum reward of -1 at state[idx] = 0.

Constructor

Parameters:

idx – index of the element (angle) of interest

class ExpQuadrErrRewFcn(Q: Union[ndarray, list], R: Union[ndarray, list])[source]

Bases: QuadrErrRewFcn

Reward function that returns the exp of the weighted sum of squared errors

Constructor

Parameters:
  • Q – weight matrix for the state errors (positive semi-definite)

  • R – weight matrix for the action errors (positive definite)

class ForwardVelocityRewFcn(dt: float, idx_fwd: int, fwd_rew_weight: float, ctrl_cost_weight: float)[source]

Bases: RewFcn

Reward function for the HalfCheetahSim and SwimmerSim environment, encouraging to run forward

Note

The OpenAi Gym calculates the velocity via forward differences, while here we get the velocity directly from the simulator.

Constructor

Note

The last x position, which is rewarded, is initialized by reset(), since the (sampled) initial state is unknown at construction time of the task, i.e. this reward function.

Parameters:
  • dt – simulation step size [s]

  • idx_fwd – index of the state dimension that marks the forward direction

  • fwd_rew_weight – scaling factor for the forward velocity reward

  • ctrl_cost_weight – scaling factor for the control cost

reset(init_state, **kwargs)[source]

Reset internal members. This function is called from the Task.reset() function. The default implementation does nothing.

class ForwardVelocityRewFcnAnt(dt: float, contact_force_range: Tuple[float], contact_cost_weight: float = 0.0005, ctrl_cost_weight: float = 0.5, healthy_reward: float = 1.0, terminate_when_unhealthy: bool = True, healthy_z_range: Tuple[float] = (0.2, 1.0))[source]

Bases: RewFcn

Reward function for the AntSim environment, encouraging to run forward

Note

The OpenAi Gym calculates the velocity via forward differences, while here we get the velocity directly from the simulator.

Constructor

Note

The last x position, which is rewarded, is initialized by reset(), since the (sampled) initial state is unknown at construction time of the task, i.e. this reward function.

Parameters:
  • dt – simulation step size [s]

  • idx_fwd – index of the state dimension that marks the forward direction

  • fwd_rew_weight – scaling factor for the forward velocity reward

  • ctrl_cost_weight – scaling factor for the control cost

contact_forces(raw_contact_forces)[source]
property healthy_reward
property is_healthy: bool
reset(init_state, **kwargs)[source]

Reset internal members. This function is called from the Task.reset() function. The default implementation does nothing.

class ForwardVelocityRewFcnHumanoid(dt: float, contact_cost_weight: float = 5e-07, ctrl_cost_weight: float = 0.1, forward_reward_weight: float = 1.25, healthy_reward: float = 5.0, terminate_when_unhealthy: bool = True, healthy_z_range: Tuple[float] = (1.0, 2.0), contact_cost_range: Tuple[float] = (-inf, 10.0))[source]

Bases: RewFcn

Reward function for the HumanoidSim environment, encouraging to run forward

Note

The OpenAi Gym calculates the velocity via forward differences, while here we get the velocity directly from the simulator.

Constructor

Note

The last x position, which is rewarded, is initialized by reset(), since the (sampled) initial state is unknown at construction time of the task, i.e. this reward function.

Parameters:
  • dt – simulation step size [s]

  • idx_fwd – index of the state dimension that marks the forward direction

  • fwd_rew_weight – scaling factor for the forward velocity reward

  • ctrl_cost_weight – scaling factor for the control cost

property healthy_reward
property is_healthy: bool
reset(init_state, **kwargs)[source]

Reset internal members. This function is called from the Task.reset() function. The default implementation does nothing.

class MinusOnePerStepRewFcn[source]

Bases: RewFcn

Reward function that yields -1 reward every time step. A positive or negative final reward can be specified on the Task-level.

class PlusOnePerStepRewFcn[source]

Bases: RewFcn

Reward function that yields +1 reward every time step. A positive or negative final reward can be specified on the Task-level.

class QCartPoleSwingUpRewFcn(factor: float = 0.9)[source]

Bases: RewFcn

Custom reward function for QCartPoleSwingUpSim.

Constructor

Parameters:

factor – weighting factor of rotation error to position error

class QuadrErrRewFcn(Q: Union[ndarray, list], R: Union[ndarray, list])[source]

Bases: RewFcn

Reward function that returns the exp of the weighted sum of squared errors.

Constructor

Parameters:
  • Q – weight matrix for the state errors (positive semi-definite)

  • R – weight matrix for the action errors (positive definite)

class RewFcn[source]

Bases: ABC

Base class for all reward functions

reset(*args, **kwargs)[source]

Reset internal members. This function is called from the Task.reset() function. The default implementation does nothing.

class SSCosOfOneEleRewFcn(idx: int)[source]

Bases: CosOfOneEleRewFcn

Reward function that takes the shifted and scaled cosine of one element of the state, given by an index. Maximum reward of +1 at state[idx] = +/- state_des[idx], minimum reward of 0 at state[idx] = 0.

Constructor

Parameters:

idx – index of the element (angle) of interest

class ScaledExpQuadrErrRewFcn(Q: [<class 'numpy.ndarray'>, <class 'list'>], R: [<class 'numpy.ndarray'>, <class 'list'>], state_space: ~pyrado.spaces.base.Space, act_space: ~pyrado.spaces.base.Space, min_rew: float = 0.0001)[source]

Bases: QuadrErrRewFcn

Reward function that returns the exp of the scaled weighted sum of squared errors

Constructor

Note

This reward function type depends on environment specific parameters. Due to the domain randomization, have to re-init the reward function after every randomization of the env, since obs_max and act_max can change when randomizing the domain parameters.

Parameters:
  • Q – weight matrix for the state errors (positive semi-definite)

  • R – weight matrix for the action errors (positive definite)

  • state_space – for extracting the worst case (max cost) observation

  • act_space – for extracting the worst case (max cost) action

  • min_rew – minimum reward (only used for the scaling factor in the exponential reward function)

reset(state_space: Space, act_space: Space, min_rew=0.0001, **kwargs)[source]

Reset internal members. This function is called from the Task.reset() function. The default implementation does nothing.

class StateBasedRewFcn(fcn: Callable[[ndarray], float], flip_sign: bool = False)[source]

Bases: object

Reward function which directly operates on the state a.k.a. solution. This class is supposed to be used for wrapping classical optimization problems into Pyrado, thus it is negative of the loss function.

Constructor

Parameters:
  • fcn – function for evaluating the state a.k.a. solution

  • flip_sign – return negative of fcn, useful to turn minimization problems into maximization problems

class UnderActuatedSwingUpRewFcn(c_pole: float = 1.0, c_cart: float = 0.2, c_act: float = 0.001, c_theta_sq: float = 1.0, c_theta_log: float = 0.1, idx_x: int = 0, idx_th: int = 1)[source]

Bases: RewFcn

Reward function for the swing-up task on the Cart-Pole system similar to [1].

See also

[1] W. Yu, J. Tan, C.K. Liu, G. Turk, “Preparing for the Unknown: Learning a Universal Policy with Online System

Identification”, RSS, 2017

Constructor

Parameters:
  • c_pole – scaling parameter for the pole angle cost

  • c_cart – scaling parameter for the cart position cost

  • c_act – scaling parameter for the control cost

  • c_theta_sq – scaling parameter for the quadratic angle deviation

  • c_theta_log – shifting parameter for the logarithm of the quadratic angle deviation

  • idx_x – index of he state representing the driving component of the system (e.g. cart position x)

  • idx_th – index of he state representing the rotating of the system (e.g. pole angle theta)

class ZeroPerStepRewFcn[source]

Bases: RewFcn

Reward function that yields 0 reward every time step. A positive or negative final reward can be specified on the Task-level.

sequential

class SequentialTasks(tasks: Sequence[Task], start_idx: int = 0, hold_rew_when_done: bool = False, verbose: bool = False)[source]

Bases: Task

Task class for a sequence of tasks a.k.a. goals

Constructor

Parameters:
  • tasks – sequence of tasks a.k.a. goals, the order matters

  • start_idx – index of the task to start with, by default with the first one in the list

  • hold_rew_when_done – if True reward values for done tasks will be stored and added every step

  • verbose – print messages on task completion

Note

hold_rew_when_done=True only makes sense for positive rewards.

compute_final_rew(state: ndarray, remaining_steps: int) float[source]

Compute the reward / cost on task completion / fail of this task. Since this task holds multiple sub-tasks, the final reward / cost is computed for them, too.

Note

The ParallelTasks class is not a subclass of TaskWrapper, i.e. this function only looks at the immediate sub-tasks.

Parameters:
  • state – current state of the environment

  • remaining_steps – number of time steps left in the episode

Returns:

final reward of all sub-tasks

property env_spec: EnvSpec

Get the specification of environment the task is in.

has_succeeded(state: ndarray) bool[source]

Check if this tasks is done. The SequentialTasks is successful if all sub-tasks are successful.

Parameters:

state – environments current state

Returns:

True if succeeded

property idx_curr: int

Get the index of the currently active task.

reset(**kwargs)[source]

Reset all tasks.

property rew_fcn: RewFcn

Get the reward function of the current task.

property space_des: Space

Get the desired space the current task.

property state_des: ndarray

Get the desired state the current task.

step_rew(state: ndarray, act: ndarray, remaining_steps: int) float[source]

Get the step reward from the current task.

property tasks: Sequence[Task]

Get the list of tasks.

utils

never_succeeded(err: Optional[ndarray] = None) bool[source]

The task is never marked successful, i.e. runs until the environments state or time is out of bounds.

proximity_succeeded(err: ndarray, thold_dist: float, dims: int = -1) bool[source]

The task is done successfully if the L2-norm of the selected dimensions of the state is smaller than some threshold.

Parameters:
  • err – error in state, i.e. difference between desired and current state

  • thold_dist – threshold for being solved

  • dims – selected dimensions, i.e. indices, for the state, by default all dimensions are selected

Returns:

True if successful

Module contents