# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
# or Technical University of Darmstadt, nor the names of its contributors may
# be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from copy import deepcopy
from typing import Sequence, Union
import numpy as np
import pyrado
from pyrado.tasks.base import Task
from pyrado.utils.data_types import EnvSpec
from pyrado.utils.input_output import print_cbt
[docs]class ParallelTasks(Task):
"""Task class for a set of tasks a.k.a. goals which can be achieved in any order or parallel"""
def __init__(
self,
tasks: Sequence[Task],
hold_rew_when_done: bool = False,
allow_failures: bool = False,
easily_satisfied: bool = False,
verbose: bool = False,
):
"""
Constructor
:param tasks: sequence of tasks a.k.a. goals, the order matters
:param hold_rew_when_done: if `True` reward values for done tasks will be stored and added every step
:param allow_failures: if `True` this allows to continue after one sub-task failed, by default `False`
:param easily_satisfied: if `True` one successful subtask is enough to make the complete task successful,
by default `False`. Use this wisely.
:param verbose: print messages on task completion
.. note::
This task can also be wrapped by a `FinalRewTask` to enjoy modularity.
`hold_rew_when_done=True` only makes sense for positive rewards.
"""
self._tasks = tasks
self.succeeded_tasks = np.full(len(self), False, dtype=bool)
self.failed_tasks = np.full(len(self), False, dtype=bool)
self.hold_rew_when_done = hold_rew_when_done
self.easily_satisfied = easily_satisfied
if self.hold_rew_when_done:
self.held_rews = np.zeros(len(self))
self.allow_failures = allow_failures
self.verbose = verbose
def __len__(self) -> int:
return len(self._tasks)
@property
def env_spec(self) -> EnvSpec:
return self._tasks[0].env_spec # safe to assume that all tasks have the same env_spec
@property
def tasks(self) -> Sequence[Task]:
"""Get the list of tasks."""
return deepcopy(self._tasks)
@property
def state_des(self) -> list:
"""Get a list of all desired states."""
return [task.state_des for task in self._tasks if hasattr(task, "state_des")]
@state_des.setter
def state_des(self, states_des: Union[list, tuple]):
"""Set all desired states from a list of desired states."""
if not isinstance(states_des, (list, tuple)):
# Explicitly require to use a list or tuple to avoid setting just one desired state
raise pyrado.TypeErr(given=states_des, expected_type=[list, tuple])
if not len(states_des) == len(self.state_des):
raise pyrado.ShapeErr(given=states_des, expected_match=self.state_des)
i = 0
for task in self._tasks:
if hasattr(task, "state_des"):
task.state_des = states_des[i]
i += 1
@property
def space_des(self) -> list:
"""Get a list of all desired spaces."""
return [task.space_des for task in self._tasks if hasattr(task, "space_des")]
@space_des.setter
def space_des(self, spaces_des: Sequence):
"""Set all desired spaces from a list of desired spaces."""
if not isinstance(spaces_des, (list, tuple)):
# Explicitly require to use a list or tuple to avoid setting just one desired space
raise pyrado.TypeErr(given=spaces_des, expected_type=[list, tuple])
if not len(spaces_des) == len(self.space_des):
raise pyrado.ShapeErr(given=spaces_des, expected_match=self.space_des)
i = 0
for task in self._tasks:
if hasattr(task, "space_des"):
task.space_des = spaces_des[i]
i += 1
@property
def rew_fcn(self) -> list:
"""Get a list of all reward functions."""
return [task.rew_fcn for task in self._tasks]
[docs] def step_rew(self, state: np.ndarray, act: np.ndarray, remaining_steps: int) -> float:
"""Get the step reward accumulated from every non-done task."""
step_rew = 0.0
for i in range(len(self)):
if not (self.succeeded_tasks[i] or self.failed_tasks[i]):
# Task has not been marked done yet
step_rew += self._tasks[i].step_rew(state, act, remaining_steps)
else:
# Task is done
if self.hold_rew_when_done:
# Add the last reward from every done task (also true for failed tasks)
step_rew += self.held_rews[i]
# Check if any task is done and update the
final_rew = self._is_any_task_done(state, act, remaining_steps) # zero if the task is not done
return step_rew + final_rew
[docs] def compute_final_rew(self, state: np.ndarray, remaining_steps: int) -> float:
"""
Compute the reward / cost on task completion / fail of this task.
Since this task holds multiple sub-tasks, the final reward / cost is computed for them, too.
.. note::
The `ParallelTasks` class is not a subclass of `TaskWrapper`, i.e. this function only looks at the
immediate sub-tasks.
:param state: current state of the environment
:param remaining_steps: number of time steps left in the episode
:return: final reward of all sub-tasks
"""
sum_final_rew = 0.0
for t in self._tasks:
sum_final_rew += t.compute_final_rew(state, remaining_steps)
return sum_final_rew
[docs] def reset(self, **kwargs):
"""Reset all tasks."""
for task in self._tasks:
task.reset(**kwargs)
# Reset internal check list for done tasks
self.succeeded_tasks = np.full(len(self), False, dtype=bool)
self.failed_tasks = np.full(len(self), False, dtype=bool)
# Reset the stored reward values for done tasks
if self.hold_rew_when_done:
self.held_rews = np.zeros(len(self))
def _is_any_task_done(
self, state: np.ndarray, act: np.ndarray, remaining_steps: int, verbose: bool = False
) -> float:
"""
Check if any of the tasks is done. If so, return the final reward of this task.
:param state: current state
:param act: current action
:param remaining_steps: number of time steps left in the episode
"""
task_final_rew = 0.0
for i, task in enumerate(self._tasks):
if not self.succeeded_tasks[i] and not self.failed_tasks[i] and task.is_done(state):
# Task has not been marked done yet, but is now done
if task.has_succeeded(state):
# Check off successfully completed tasks
self.succeeded_tasks[i] = True
if verbose:
print_cbt(f"task {i} has succeeded (is done) at state {state}", "g")
elif task.has_failed(state):
# Check off unsuccessfully completed tasks
self.failed_tasks[i] = True
if verbose:
print_cbt(f"Task {i} has failed (is done) at state {state}", "r")
else:
raise pyrado.ValueErr(msg=f"Task {i} neither succeeded or failed but is done!")
# Give a reward for completing the task defined by the task
task_final_rew += task.final_rew(state, remaining_steps) # there could be more than one finished task
if self.hold_rew_when_done:
# Memorize current reward (only becomes active after success or fail)
self.held_rews[i] = self._tasks[i].step_rew(state, act, remaining_steps)
return task_final_rew
[docs] def has_succeeded(self, state: np.ndarray = None) -> bool:
"""
Check if this tasks is done. The ParallelTasks is successful if all sub-tasks are successful.
:param state: environments current state
:return: `True` if succeeded
"""
if self.easily_satisfied:
# Mark done if one subtask is done
successful = np.any(self.succeeded_tasks)
else:
# Mark done if every subtask is done
successful = np.all(self.succeeded_tasks)
if successful and self.verbose:
print_cbt(f"All {len(self)} parallel sub-tasks are done successfully", "g")
return successful
[docs] def has_failed(self, state: np.ndarray = None) -> bool:
"""Check if this task has failed."""
if self.allow_failures:
# The ParallelTasks fail if all of them fail
return np.all(self.failed_tasks)
else:
# The ParallelTasks fail if one of them fails
return np.any(self.failed_tasks)