Source code for pyrado.sampling.parallel_evaluation

# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
#    or Technical University of Darmstadt, nor the names of its contributors may
#    be used to endorse or promote products derived from this software without
#    specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

# Copyright (c) 2020, Fabio Muratore, Honda Research Institute Europe GmbH, and
# Technical University of Darmstadt.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
# 3. Neither the name of Fabio Muratore, Honda Research Institute Europe GmbH,
#    or Technical University of Darmstadt, nor the names of its contributors may
#    be used to endorse or promote products derived from this software without
#    specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL FABIO MURATORE, HONDA RESEARCH INSTITUTE EUROPE GMBH,
# OR TECHNICAL UNIVERSITY OF DARMSTADT BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import functools
import pickle
import sys
from typing import Dict, List, Optional

import numpy as np
from tqdm import tqdm

from pyrado.domain_randomization.domain_randomizer import DomainRandomizer
from pyrado.environment_wrappers.domain_randomization import DomainRandWrapperLive, remove_all_dr_wrappers
from pyrado.environments.sim_base import SimEnv
from pyrado.policies.base import Policy
from pyrado.sampling.parallel_rollout_sampler import (
    NO_SEED,
    _ps_init,
    _ps_run_one_domain_param,
    _ps_run_one_init_state,
    _ps_run_one_reset_kwargs_segment,
)
from pyrado.sampling.sampler_pool import SamplerPool
from pyrado.sampling.step_sequence import StepSequence, check_act_equal
from pyrado.spaces.singular import SingularStateSpace


[docs]def eval_domain_params( pool: SamplerPool, env: SimEnv, policy: Policy, params: List[Dict], init_state: Optional[np.ndarray] = None, seed: int = NO_SEED, ) -> List[StepSequence]: """ Evaluate a policy on a multidimensional grid of domain parameters. :param pool: parallel sampler :param env: environment to evaluate in :param policy: policy to evaluate :param params: multidimensional grid of domain parameters :param init_state: initial state of the environment which will be fixed if not set to `None` :return: list of rollouts """ # Strip all domain randomization wrappers from the environment env = remove_all_dr_wrappers(env, verbose=True) if init_state is not None: env.init_space = SingularStateSpace(fixed_state=init_state) pool.invoke_all(_ps_init, pickle.dumps(env), pickle.dumps(policy)) # Run with progress bar with tqdm(leave=False, file=sys.stdout, unit="rollouts", desc="Sampling") as pb: # we set the sub seed to zero since every run will have its personal sub sub seed return pool.run_map( functools.partial(_ps_run_one_domain_param, eval=True, seed=seed, sub_seed=0), list(enumerate(params)), pb )
[docs]def eval_nominal_domain( pool: SamplerPool, env: SimEnv, policy: Policy, init_states: List[np.ndarray] ) -> List[StepSequence]: """ Evaluate a policy using the nominal (set in the given environment) domain parameters. :param pool: parallel sampler :param env: environment to evaluate in :param policy: policy to evaluate :param init_states: initial states of the environment which will be fixed if not set to `None` :return: list of rollouts """ # Strip all domain randomization wrappers from the environment env = remove_all_dr_wrappers(env) pool.invoke_all(_ps_init, pickle.dumps(env), pickle.dumps(policy)) # Run with progress bar with tqdm(leave=False, file=sys.stdout, unit="rollouts", desc="Sampling") as pb: return pool.run_map(functools.partial(_ps_run_one_init_state, eval=True), list(enumerate(init_states)), pb)
[docs]def eval_randomized_domain( pool: SamplerPool, env: SimEnv, randomizer: DomainRandomizer, policy: Policy, init_states: List[np.ndarray] ) -> List[StepSequence]: """ Evaluate a policy in a randomized domain. :param pool: parallel sampler :param env: environment to evaluate in :param randomizer: randomizer used to sample random domain instances, inherited from `DomainRandomizer` :param policy: policy to evaluate :param init_states: initial states of the environment which will be fixed if not set to `None` :return: list of rollouts """ # Randomize the environments env = remove_all_dr_wrappers(env) env = DomainRandWrapperLive(env, randomizer) pool.invoke_all(_ps_init, pickle.dumps(env), pickle.dumps(policy)) # Run with progress bar with tqdm(leave=False, file=sys.stdout, unit="rollouts", desc="Sampling") as pb: return pool.run_map(functools.partial(_ps_run_one_init_state, eval=True), list(enumerate(init_states)), pb)
[docs]def eval_domain_params_with_segmentwise_reset( pool: SamplerPool, env_sim: SimEnv, policy: Policy, segments_real_all: List[List[StepSequence]], domain_params_ml_all: List[List[dict]], stop_on_done: bool, use_rec: bool, ) -> List[List[StepSequence]]: """ Evaluate a policy for a given set of domain parameters, synchronizing the segments' initial states with the given target domain segments :param pool: parallel sampler :param env_sim: environment to evaluate in :param policy: policy to evaluate :param segments_real_all: all segments from the target domain rollout :param domain_params_ml_all: all domain parameters to evaluate over :param stop_on_done: if `True`, the rollouts are stopped as soon as they hit the state or observation space boundaries. This behavior is save, but can lead to short trajectories which are eventually padded with zeroes. Chose `False` to ignore the boundaries (dangerous on the real system). :param use_rec: `True` if pre-recorded actions have been used to generate the rollouts :return: list of segments of rollouts """ # Sample rollouts with the most likely domain parameter sets associated to that observation segments_ml_all = [] # all top max likelihood segments for all target domain rollouts for idx_r, (segments_real, domain_params_ml) in tqdm( enumerate(zip(segments_real_all, domain_params_ml_all)), total=len(segments_real_all), desc="Sampling", file=sys.stdout, leave=False, ): segments_ml = [] # all top max likelihood segments for one target domain rollout cnt_step = 0 # Iterate over target domain segments for segment_real in segments_real: # Initialize workers pool.invoke_all(_ps_init, pickle.dumps(env_sim), pickle.dumps(policy)) # Run without progress bar segments_dp = pool.run_map( functools.partial( _ps_run_one_reset_kwargs_segment, init_state=segment_real.states[0, :], len_segment=segment_real.length, stop_on_done=stop_on_done, use_rec=use_rec, idx_r=idx_r, cnt_step=cnt_step, eval=True, ), domain_params_ml, ) for sdp in segments_dp: assert np.allclose(sdp.states[0, :], segment_real.states[0, :]) if use_rec: check_act_equal(segment_real, sdp, check_applied=hasattr(sdp, "actions_applied")) # Increase step counter for next segment, and append all domain parameter segments cnt_step += segment_real.length segments_ml.append(segments_dp) # Append all segments for the current target domain rollout segments_ml_all.append(segments_ml) return segments_ml_all