Source code for pyrado.tasks.sequential

# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
#    or Technical University of Darmstadt, nor the names of its contributors may
#    be used to endorse or promote products derived from this software without
#    specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from copy import deepcopy
from typing import Sequence

import numpy as np

import pyrado
from pyrado.spaces.base import Space
from pyrado.tasks.base import Task
from pyrado.tasks.reward_functions import RewFcn
from pyrado.utils.data_types import EnvSpec
from pyrado.utils.input_output import print_cbt


[docs]class SequentialTasks(Task): """Task class for a sequence of tasks a.k.a. goals""" def __init__( self, tasks: Sequence[Task], start_idx: int = 0, hold_rew_when_done: bool = False, verbose: bool = False ): """ Constructor :param tasks: sequence of tasks a.k.a. goals, the order matters :param start_idx: index of the task to start with, by default with the first one in the list :param hold_rew_when_done: if `True` reward values for done tasks will be stored and added every step :param verbose: print messages on task completion .. note:: `hold_rew_when_done=True` only makes sense for positive rewards. """ self._tasks = tasks self._idx_curr = start_idx self.succeeded_tasks = np.full(len(self), False, dtype=bool) self.failed_tasks = np.full(len(self), False, dtype=bool) self.succeeded_tasks[:start_idx] = True # check off tasks which are before the start task self.hold_rew_when_done = hold_rew_when_done if self.hold_rew_when_done: self.held_rews = np.zeros(len(self)) self.verbose = verbose def __len__(self) -> int: return len(self._tasks) @property def env_spec(self) -> EnvSpec: return self._tasks[0].env_spec # safe to assume that all tasks have the same env_spec @property def tasks(self) -> Sequence[Task]: """Get the list of tasks.""" return deepcopy(self._tasks) @property def idx_curr(self) -> int: """Get the index of the currently active task.""" return self._idx_curr @idx_curr.setter def idx_curr(self, idx: int): """Set the index of the currently active task.""" if not (0 <= idx < len(self)): raise pyrado.ValueErr(given=idx, ge_constraint="0", le_constraint=f"{len(self) - 1}") self._idx_curr = idx @property def state_des(self) -> np.ndarray: """Get the desired state the current task.""" return self._tasks[self._idx_curr].state_des @state_des.setter def state_des(self, state_des: np.ndarray): """Set the desired state the current task.""" if not isinstance(state_des, np.ndarray): raise pyrado.TypeErr(given=state_des, expected_type=np.ndarray) self._tasks[self._idx_curr].state_des = state_des @property def space_des(self) -> Space: """Get the desired space the current task.""" return self._tasks[self._idx_curr].space_des @space_des.setter def space_des(self, space_des: Space): """Set the desired space the current task.""" if not isinstance(space_des, Space): raise pyrado.TypeErr(given=space_des, expected_type=Space) self._tasks[self._idx_curr].space_des = space_des @property def rew_fcn(self) -> RewFcn: """Get the reward function of the current task.""" return self._tasks[self._idx_curr].rew_fcn
[docs] def step_rew(self, state: np.ndarray, act: np.ndarray, remaining_steps: int) -> float: """Get the step reward from the current task.""" step_rew = 0.0 if self.hold_rew_when_done: for i in range(len(self)): # Iterate over previous tasks if self.succeeded_tasks[i] or self.failed_tasks[i]: # Add the last reward from every done task (also true for failed tasks) step_rew += self.held_rews[i] if not (self.succeeded_tasks[self._idx_curr] or self.failed_tasks[self._idx_curr]): # Only give step reward if current sub-task is active step_rew += self._tasks[self._idx_curr].step_rew(state, act, remaining_steps) final_rew = self._is_curr_task_done(state, act, remaining_steps) # zero if the task is not done # self.logger.add_value('successful tasks', self.successful_tasks) return step_rew + final_rew
[docs] def compute_final_rew(self, state: np.ndarray, remaining_steps: int) -> float: """ Compute the reward / cost on task completion / fail of this task. Since this task holds multiple sub-tasks, the final reward / cost is computed for them, too. .. note:: The `ParallelTasks` class is not a subclass of `TaskWrapper`, i.e. this function only looks at the immediate sub-tasks. :param state: current state of the environment :param remaining_steps: number of time steps left in the episode :return: final reward of all sub-tasks """ sum_final_rew = 0.0 for t in self._tasks: sum_final_rew += t.compute_final_rew(state, remaining_steps) return sum_final_rew
[docs] def reset(self, **kwargs): """Reset all tasks.""" self.idx_curr = 0 for s in self._tasks: s.reset(**kwargs) # Reset internal check list for done tasks self.succeeded_tasks = np.full(len(self), False, dtype=bool) self.failed_tasks = np.full(len(self), False, dtype=bool) if "start_idx" in kwargs: self.succeeded_tasks[: kwargs["start_idx"]] = True # Reset the stored reward values for done tasks if self.hold_rew_when_done: self.held_rews = np.zeros(len(self)) # doesn't work with start_idx
def _is_curr_task_done( self, state: np.ndarray, act: np.ndarray, remaining_steps: int, verbose: bool = False ) -> float: """ Check if the current task is done. If so, move to the next one and return the final reward of this task. :param state: current state :param act: current action :param remaining_steps: number of time steps left in the episode :param verbose: print messages on success or failure :return: final return of the current subtask """ if ( not self.succeeded_tasks[self._idx_curr] and not self.failed_tasks[self._idx_curr] and self._tasks[self._idx_curr].is_done(state) ): # Task has not been marked done yet, but is now done if self._tasks[self._idx_curr].has_succeeded(state): # Check off successfully completed task self.succeeded_tasks[self._idx_curr] = True if verbose: print_cbt(f"task {self._idx_curr} has succeeded (is done) at state {state}", "g") elif self._tasks[self._idx_curr].has_failed(state): # Check off unsuccessfully completed task self.failed_tasks[self._idx_curr] = True if verbose: print_cbt(f"Task {self._idx_curr} has failed (is done) at state {state}", "r") else: raise pyrado.ValueErr(msg=f"Task {self._idx_curr} neither succeeded or failed but is done!") # Memorize current reward if self.hold_rew_when_done: self.held_rews[self._idx_curr] = self._tasks[self._idx_curr].step_rew(state, act, remaining_steps=0) # Give a reward for completing the task defined by the task task_final_rew = self._tasks[self._idx_curr].final_rew(state, remaining_steps) # Advance to the next task self.idx_curr = (self._idx_curr + 1) % len(self) else: task_final_rew = 0.0 return task_final_rew
[docs] def has_succeeded(self, state: np.ndarray) -> bool: """ Check if this tasks is done. The SequentialTasks is successful if all sub-tasks are successful. :param state: environments current state :return: `True` if succeeded """ successful = np.all(self.succeeded_tasks) if successful and self.verbose: print_cbt(f"All {len(self)} sequential sub-tasks are done successfully", "g") return successful