123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- #!/usr/bin/env python
- # Copyright (c) 2018-2020 Intel Corporation
- #
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
- """
- This module provide BasicScenario, the basic class of all the scenarios.
- """
- from __future__ import print_function
- import operator
- import py_trees
- import carla
- import srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions as conditions
- from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
- from srunner.scenariomanager.timer import TimeOut
- from srunner.scenariomanager.weather_sim import WeatherBehavior
- from srunner.scenariomanager.scenarioatomics.atomic_behaviors import UpdateAllActorControls
- class BasicScenario(object):
- """
- Base class for user-defined scenario
- """
- def __init__(self, name, ego_vehicles, config, world,
- debug_mode=False, terminate_on_failure=False, criteria_enable=False):
- """
- Setup all relevant parameters and create scenario
- and instantiate scenario manager
- """
- self.other_actors = []
- if not self.timeout: # pylint: disable=access-member-before-definition
- self.timeout = 60 # If no timeout was provided, set it to 60 seconds
- self.criteria_list = [] # List of evaluation criteria
- self.scenario = None
- self.ego_vehicles = ego_vehicles
- self.name = name
- self.config = config
- self.terminate_on_failure = terminate_on_failure
- self._initialize_environment(world)
- # Initializing adversarial actors
- self._initialize_actors(config)
- if CarlaDataProvider.is_sync_mode():
- world.tick()
- else:
- world.wait_for_tick()
- # Setup scenario
- if debug_mode:
- py_trees.logging.level = py_trees.logging.Level.DEBUG
- behavior = self._create_behavior()
- criteria = None
- if criteria_enable:
- criteria = self._create_test_criteria()
- # Add a trigger condition for the behavior to ensure the behavior is only activated, when it is relevant
- behavior_seq = py_trees.composites.Sequence()
- trigger_behavior = self._setup_scenario_trigger(config)
- if trigger_behavior:
- behavior_seq.add_child(trigger_behavior)
- if behavior is not None:
- behavior_seq.add_child(behavior)
- behavior_seq.name = behavior.name
- end_behavior = self._setup_scenario_end(config)
- if end_behavior:
- behavior_seq.add_child(end_behavior)
- self.scenario = Scenario(behavior_seq, criteria, self.name, self.timeout, self.terminate_on_failure)
- def _initialize_environment(self, world):
- """
- Default initialization of weather and road friction.
- Override this method in child class to provide custom initialization.
- """
- # Set the appropriate weather conditions
- world.set_weather(self.config.weather)
- # Set the appropriate road friction
- if self.config.friction is not None:
- friction_bp = world.get_blueprint_library().find('static.trigger.friction')
- extent = carla.Location(1000000.0, 1000000.0, 1000000.0)
- friction_bp.set_attribute('friction', str(self.config.friction))
- friction_bp.set_attribute('extent_x', str(extent.x))
- friction_bp.set_attribute('extent_y', str(extent.y))
- friction_bp.set_attribute('extent_z', str(extent.z))
- # Spawn Trigger Friction
- transform = carla.Transform()
- transform.location = carla.Location(-10000.0, -10000.0, 0.0)
- world.spawn_actor(friction_bp, transform)
- def _initialize_actors(self, config):
- """
- Default initialization of other actors.
- Override this method in child class to provide custom initialization.
- """
- if config.other_actors:
- new_actors = CarlaDataProvider.request_new_actors(config.other_actors)
- if not new_actors:
- raise Exception("Error: Unable to add actors")
- for new_actor in new_actors:
- self.other_actors.append(new_actor)
- def _setup_scenario_trigger(self, config):
- """
- This function creates a trigger maneuver, that has to be finished before the real scenario starts.
- This implementation focuses on the first available ego vehicle.
- The function can be overloaded by a user implementation inside the user-defined scenario class.
- """
- start_location = None
- if config.trigger_points and config.trigger_points[0]:
- start_location = config.trigger_points[0].location # start location of the scenario
- ego_vehicle_route = CarlaDataProvider.get_ego_vehicle_route()
- if start_location:
- if ego_vehicle_route:
- if config.route_var_name is None: # pylint: disable=no-else-return
- return conditions.InTriggerDistanceToLocationAlongRoute(self.ego_vehicles[0],
- ego_vehicle_route,
- start_location,
- 5)
- else:
- check_name = "WaitForBlackboardVariable: {}".format(config.route_var_name)
- return conditions.WaitForBlackboardVariable(name=check_name,
- variable_name=config.route_var_name,
- variable_value=True,
- var_init_value=False)
- return conditions.InTimeToArrivalToLocation(self.ego_vehicles[0],
- 2.0,
- start_location)
- return None
- def _setup_scenario_end(self, config):
- """
- This function adds and additional behavior to the scenario, which is triggered
- after it has ended.
- The function can be overloaded by a user implementation inside the user-defined scenario class.
- """
- ego_vehicle_route = CarlaDataProvider.get_ego_vehicle_route()
- if ego_vehicle_route:
- if config.route_var_name is not None:
- set_name = "Reset Blackboard Variable: {} ".format(config.route_var_name)
- return py_trees.blackboard.SetBlackboardVariable(name=set_name,
- variable_name=config.route_var_name,
- variable_value=False)
- return None
- def _create_behavior(self):
- """
- Pure virtual function to setup user-defined scenario behavior
- """
- raise NotImplementedError(
- "This function is re-implemented by all scenarios"
- "If this error becomes visible the class hierarchy is somehow broken")
- def _create_test_criteria(self):
- """
- Pure virtual function to setup user-defined evaluation criteria for the
- scenario
- """
- raise NotImplementedError(
- "This function is re-implemented by all scenarios"
- "If this error becomes visible the class hierarchy is somehow broken")
- def change_control(self, control): # pylint: disable=no-self-use
- """
- This is a function that changes the control based on the scenario determination
- :param control: a carla vehicle control
- :return: a control to be changed by the scenario.
- Note: This method should be overriden by the user-defined scenario behavior
- """
- return control
- def remove_all_actors(self):
- """
- Remove all actors
- """
- for i, _ in enumerate(self.other_actors):
- if self.other_actors[i] is not None:
- if CarlaDataProvider.actor_id_exists(self.other_actors[i].id):
- CarlaDataProvider.remove_actor_by_id(self.other_actors[i].id)
- self.other_actors[i] = None
- self.other_actors = []
- class Scenario(object):
- """
- Basic scenario class. This class holds the behavior_tree describing the
- scenario and the test criteria.
- The user must not modify this class.
- Important parameters:
- - behavior: User defined scenario with py_tree
- - criteria_list: List of user defined test criteria with py_tree
- - timeout (default = 60s): Timeout of the scenario in seconds
- - terminate_on_failure: Terminate scenario on first failure
- """
- def __init__(self, behavior, criteria, name, timeout=60, terminate_on_failure=False):
- self.behavior = behavior
- self.test_criteria = criteria
- self.timeout = timeout
- self.name = name
- if self.test_criteria is not None and not isinstance(self.test_criteria, py_trees.composites.Parallel):
- # list of nodes
- for criterion in self.test_criteria:
- criterion.terminate_on_failure = terminate_on_failure
- # Create py_tree for test criteria
- self.criteria_tree = py_trees.composites.Parallel(
- name="Test Criteria",
- policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE
- )
- self.criteria_tree.add_children(self.test_criteria)
- self.criteria_tree.setup(timeout=1)
- else:
- self.criteria_tree = criteria
- # Create node for timeout
- self.timeout_node = TimeOut(self.timeout, name="TimeOut")
- # Create overall py_tree
- self.scenario_tree = py_trees.composites.Parallel(name, policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
- if behavior is not None:
- self.scenario_tree.add_child(self.behavior)
- self.scenario_tree.add_child(self.timeout_node)
- self.scenario_tree.add_child(WeatherBehavior())
- self.scenario_tree.add_child(UpdateAllActorControls())
- if criteria is not None:
- self.scenario_tree.add_child(self.criteria_tree)
- self.scenario_tree.setup(timeout=1)
- def _extract_nodes_from_tree(self, tree): # pylint: disable=no-self-use
- """
- Returns the list of all nodes from the given tree
- """
- node_list = [tree]
- more_nodes_exist = True
- while more_nodes_exist:
- more_nodes_exist = False
- for node in node_list:
- if node.children:
- node_list.remove(node)
- more_nodes_exist = True
- for child in node.children:
- node_list.append(child)
- if len(node_list) == 1 and isinstance(node_list[0], py_trees.composites.Parallel):
- return []
- return node_list
- def get_criteria(self):
- """
- Return the list of test criteria (all leave nodes)
- """
- criteria_list = self._extract_nodes_from_tree(self.criteria_tree)
- return criteria_list
- def terminate(self):
- """
- This function sets the status of all leaves in the scenario tree to INVALID
- """
- # Get list of all nodes in the tree
- node_list = self._extract_nodes_from_tree(self.scenario_tree)
- # Set status to INVALID
- for node in node_list:
- node.terminate(py_trees.common.Status.INVALID)
- # Cleanup all instantiated controllers
- actor_dict = {}
- try:
- check_actors = operator.attrgetter("ActorsWithController")
- actor_dict = check_actors(py_trees.blackboard.Blackboard())
- except AttributeError:
- pass
- for actor_id in actor_dict:
- actor_dict[actor_id].reset()
- py_trees.blackboard.Blackboard().set("ActorsWithController", {}, overwrite=True)
|