123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- #!/usr/bin/env python
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
- """
- Object crash without prior vehicle action scenario:
- The scenario realizes the user controlled ego vehicle
- moving along the road and encountering a cyclist ahead.
- """
- from __future__ import print_function
- import math
- import py_trees
- import carla
- from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
- from srunner.scenariomanager.scenarioatomics.atomic_behaviors import (ActorTransformSetter,
- ActorDestroy,
- AccelerateToVelocity,
- HandBrakeVehicle,
- KeepVelocity,
- StopVehicle)
- from srunner.scenariomanager.scenarioatomics.atomic_criteria import CollisionTest
- from srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions import (InTriggerDistanceToLocationAlongRoute,
- InTimeToArrivalToVehicle,
- DriveDistance)
- from srunner.scenariomanager.timer import TimeOut
- from srunner.scenarios.basic_scenario import BasicScenario
- from srunner.tools.scenario_helper import get_location_in_distance_from_wp
- class StationaryObjectCrossing(BasicScenario):
- """
- This class holds everything required for a simple object crash
- without prior vehicle action involving a vehicle and a cyclist.
- The ego vehicle is passing through a road and encounters
- a stationary cyclist.
- This is a single ego vehicle scenario
- """
- def __init__(self, world, ego_vehicles, config, randomize=False, debug_mode=False, criteria_enable=True,
- timeout=60):
- """
- Setup all relevant parameters and create scenario
- """
- self._wmap = CarlaDataProvider.get_map()
- self._reference_waypoint = self._wmap.get_waypoint(config.trigger_points[0].location)
- # ego vehicle parameters
- self._ego_vehicle_distance_driven = 40
- # other vehicle parameters
- self._other_actor_target_velocity = 10
- # Timeout of scenario in seconds
- self.timeout = timeout
- super(StationaryObjectCrossing, self).__init__("Stationaryobjectcrossing",
- ego_vehicles,
- config,
- world,
- debug_mode,
- criteria_enable=criteria_enable)
- def _initialize_actors(self, config):
- """
- Custom initialization
- """
- _start_distance = 40
- lane_width = self._reference_waypoint.lane_width
- location, _ = get_location_in_distance_from_wp(self._reference_waypoint, _start_distance)
- waypoint = self._wmap.get_waypoint(location)
- offset = {"orientation": 270, "position": 90, "z": 0.4, "k": 0.2}
- position_yaw = waypoint.transform.rotation.yaw + offset['position']
- orientation_yaw = waypoint.transform.rotation.yaw + offset['orientation']
- offset_location = carla.Location(
- offset['k'] * lane_width * math.cos(math.radians(position_yaw)),
- offset['k'] * lane_width * math.sin(math.radians(position_yaw)))
- location += offset_location
- location.z += offset['z']
- self.transform = carla.Transform(location, carla.Rotation(yaw=orientation_yaw))
- static = CarlaDataProvider.request_new_actor('static.prop.container', self.transform)
- static.set_simulate_physics(True)
- self.other_actors.append(static)
- def _create_behavior(self):
- """
- Only behavior here is to wait
- """
- lane_width = self.ego_vehicles[0].get_world().get_map().get_waypoint(
- self.ego_vehicles[0].get_location()).lane_width
- lane_width = lane_width + (1.25 * lane_width)
- # leaf nodes
- actor_stand = TimeOut(15)
- actor_removed = ActorDestroy(self.other_actors[0])
- end_condition = DriveDistance(self.ego_vehicles[0], self._ego_vehicle_distance_driven)
- # non leaf nodes
- root = py_trees.composites.Parallel(
- policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
- scenario_sequence = py_trees.composites.Sequence()
- # building tree
- root.add_child(scenario_sequence)
- scenario_sequence.add_child(ActorTransformSetter(self.other_actors[0], self.transform))
- scenario_sequence.add_child(actor_stand)
- scenario_sequence.add_child(actor_removed)
- scenario_sequence.add_child(end_condition)
- return root
- def _create_test_criteria(self):
- """
- A list of all test criteria will be created that is later used
- in parallel behavior tree.
- """
- criteria = []
- collision_criterion = CollisionTest(self.ego_vehicles[0])
- criteria.append(collision_criterion)
- return criteria
- def __del__(self):
- """
- Remove all actors upon deletion
- """
- self.remove_all_actors()
- class DynamicObjectCrossing(BasicScenario):
- """
- This class holds everything required for a simple object crash
- without prior vehicle action involving a vehicle and a cyclist/pedestrian,
- The ego vehicle is passing through a road,
- And encounters a cyclist/pedestrian crossing the road.
- This is a single ego vehicle scenario
- """
- def __init__(self, world, ego_vehicles, config, randomize=False,
- debug_mode=False, criteria_enable=True, adversary_type=False, timeout=60):
- """
- Setup all relevant parameters and create scenario
- """
- self._wmap = CarlaDataProvider.get_map()
- self._reference_waypoint = self._wmap.get_waypoint(config.trigger_points[0].location)
- # ego vehicle parameters
- self._ego_vehicle_distance_driven = 40
- # other vehicle parameters
- self._other_actor_target_velocity = 5
- self._other_actor_max_brake = 1.0
- self._time_to_reach = 10
- self._adversary_type = adversary_type # flag to select either pedestrian (False) or cyclist (True)
- self._walker_yaw = 0
- self._num_lane_changes = 1
- self.transform = None
- self.transform2 = None
- self.timeout = timeout
- self._trigger_location = config.trigger_points[0].location
- # Total Number of attempts to relocate a vehicle before spawning
- self._number_of_attempts = 20
- # Number of attempts made so far
- self._spawn_attempted = 0
- self._ego_route = CarlaDataProvider.get_ego_vehicle_route()
- super(DynamicObjectCrossing, self).__init__("DynamicObjectCrossing",
- ego_vehicles,
- config,
- world,
- debug_mode,
- criteria_enable=criteria_enable)
- def _calculate_base_transform(self, _start_distance, waypoint):
- lane_width = waypoint.lane_width
- # Patches false junctions
- if self._reference_waypoint.is_junction:
- stop_at_junction = False
- else:
- stop_at_junction = True
- location, _ = get_location_in_distance_from_wp(waypoint, _start_distance, stop_at_junction)
- waypoint = self._wmap.get_waypoint(location)
- offset = {"orientation": 270, "position": 90, "z": 0.6, "k": 1.0}
- position_yaw = waypoint.transform.rotation.yaw + offset['position']
- orientation_yaw = waypoint.transform.rotation.yaw + offset['orientation']
- offset_location = carla.Location(
- offset['k'] * lane_width * math.cos(math.radians(position_yaw)),
- offset['k'] * lane_width * math.sin(math.radians(position_yaw)))
- location += offset_location
- location.z = self._trigger_location.z + offset['z']
- return carla.Transform(location, carla.Rotation(yaw=orientation_yaw)), orientation_yaw
- def _spawn_adversary(self, transform, orientation_yaw):
- self._time_to_reach *= self._num_lane_changes
- if self._adversary_type is False:
- self._walker_yaw = orientation_yaw
- self._other_actor_target_velocity = 3 + (0.4 * self._num_lane_changes)
- walker = CarlaDataProvider.request_new_actor('walker.*', transform)
- adversary = walker
- else:
- self._other_actor_target_velocity = self._other_actor_target_velocity * self._num_lane_changes
- first_vehicle = CarlaDataProvider.request_new_actor('vehicle.diamondback.century', transform)
- first_vehicle.set_simulate_physics(enabled=False)
- adversary = first_vehicle
- return adversary
- def _spawn_blocker(self, transform, orientation_yaw):
- """
- Spawn the blocker prop that blocks the vision from the egovehicle of the jaywalker
- :return:
- """
- # static object transform
- shift = 0.9
- x_ego = self._reference_waypoint.transform.location.x
- y_ego = self._reference_waypoint.transform.location.y
- x_cycle = transform.location.x
- y_cycle = transform.location.y
- x_static = x_ego + shift * (x_cycle - x_ego)
- y_static = y_ego + shift * (y_cycle - y_ego)
- spawn_point_wp = self.ego_vehicles[0].get_world().get_map().get_waypoint(transform.location)
- self.transform2 = carla.Transform(carla.Location(x_static, y_static,
- spawn_point_wp.transform.location.z + 0.3),
- carla.Rotation(yaw=orientation_yaw + 180))
- static = CarlaDataProvider.request_new_actor('static.prop.vendingmachine', self.transform2)
- static.set_simulate_physics(enabled=False)
- return static
- def _initialize_actors(self, config):
- """
- Custom initialization
- """
- # cyclist transform
- _start_distance = 12
- # We start by getting and waypoint in the closest sidewalk.
- waypoint = self._reference_waypoint
- while True:
- wp_next = waypoint.get_right_lane()
- self._num_lane_changes += 1
- if wp_next is None or wp_next.lane_type == carla.LaneType.Sidewalk:
- break
- elif wp_next.lane_type == carla.LaneType.Shoulder:
- # Filter Parkings considered as Shoulders
- if wp_next.lane_width > 2:
- _start_distance += 1.5
- waypoint = wp_next
- break
- else:
- _start_distance += 1.5
- waypoint = wp_next
- while True: # We keep trying to spawn avoiding props
- try:
- self.transform, orientation_yaw = self._calculate_base_transform(_start_distance, waypoint)
- first_vehicle = self._spawn_adversary(self.transform, orientation_yaw)
- blocker = self._spawn_blocker(self.transform, orientation_yaw)
- break
- except RuntimeError as r:
- # We keep retrying until we spawn
- print("Base transform is blocking objects ", self.transform)
- _start_distance += 0.4
- self._spawn_attempted += 1
- if self._spawn_attempted >= self._number_of_attempts:
- raise r
- # Now that we found a possible position we just put the vehicle to the underground
- disp_transform = carla.Transform(
- carla.Location(self.transform.location.x,
- self.transform.location.y,
- self.transform.location.z - 500),
- self.transform.rotation)
- prop_disp_transform = carla.Transform(
- carla.Location(self.transform2.location.x,
- self.transform2.location.y,
- self.transform2.location.z - 500),
- self.transform2.rotation)
- first_vehicle.set_transform(disp_transform)
- blocker.set_transform(prop_disp_transform)
- first_vehicle.set_simulate_physics(enabled=False)
- blocker.set_simulate_physics(enabled=False)
- self.other_actors.append(first_vehicle)
- self.other_actors.append(blocker)
- def _create_behavior(self):
- """
- After invoking this scenario, cyclist will wait for the user
- controlled vehicle to enter trigger distance region,
- the cyclist starts crossing the road once the condition meets,
- then after 60 seconds, a timeout stops the scenario
- """
- root = py_trees.composites.Parallel(
- policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE, name="OccludedObjectCrossing")
- lane_width = self._reference_waypoint.lane_width
- lane_width = lane_width + (1.25 * lane_width * self._num_lane_changes)
- dist_to_trigger = 12 + self._num_lane_changes
- # leaf nodes
- if self._ego_route is not None:
- start_condition = InTriggerDistanceToLocationAlongRoute(self.ego_vehicles[0],
- self._ego_route,
- self.transform.location,
- dist_to_trigger)
- else:
- start_condition = InTimeToArrivalToVehicle(self.ego_vehicles[0],
- self.other_actors[0],
- self._time_to_reach)
- actor_velocity = KeepVelocity(self.other_actors[0],
- self._other_actor_target_velocity,
- name="walker velocity")
- actor_drive = DriveDistance(self.other_actors[0],
- 0.5 * lane_width,
- name="walker drive distance")
- actor_start_cross_lane = AccelerateToVelocity(self.other_actors[0],
- 1.0,
- self._other_actor_target_velocity,
- name="walker crossing lane accelerate velocity")
- actor_cross_lane = DriveDistance(self.other_actors[0],
- lane_width,
- name="walker drive distance for lane crossing ")
- actor_stop_crossed_lane = StopVehicle(self.other_actors[0],
- self._other_actor_max_brake,
- name="walker stop")
- ego_pass_machine = DriveDistance(self.ego_vehicles[0],
- 5,
- name="ego vehicle passed prop")
- actor_remove = ActorDestroy(self.other_actors[0],
- name="Destroying walker")
- static_remove = ActorDestroy(self.other_actors[1],
- name="Destroying Prop")
- end_condition = DriveDistance(self.ego_vehicles[0],
- self._ego_vehicle_distance_driven,
- name="End condition ego drive distance")
- # non leaf nodes
- scenario_sequence = py_trees.composites.Sequence()
- keep_velocity_other = py_trees.composites.Parallel(
- policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE, name="keep velocity other")
- keep_velocity = py_trees.composites.Parallel(
- policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE, name="keep velocity")
- # building tree
- root.add_child(scenario_sequence)
- scenario_sequence.add_child(ActorTransformSetter(self.other_actors[0], self.transform,
- name='TransformSetterTS3walker'))
- scenario_sequence.add_child(ActorTransformSetter(self.other_actors[1], self.transform2,
- name='TransformSetterTS3coca', physics=False))
- scenario_sequence.add_child(HandBrakeVehicle(self.other_actors[0], True))
- scenario_sequence.add_child(start_condition)
- scenario_sequence.add_child(HandBrakeVehicle(self.other_actors[0], False))
- scenario_sequence.add_child(keep_velocity)
- scenario_sequence.add_child(keep_velocity_other)
- scenario_sequence.add_child(actor_stop_crossed_lane)
- scenario_sequence.add_child(actor_remove)
- scenario_sequence.add_child(static_remove)
- scenario_sequence.add_child(end_condition)
- keep_velocity.add_child(actor_velocity)
- keep_velocity.add_child(actor_drive)
- keep_velocity_other.add_child(actor_start_cross_lane)
- keep_velocity_other.add_child(actor_cross_lane)
- keep_velocity_other.add_child(ego_pass_machine)
- return root
- def _create_test_criteria(self):
- """
- A list of all test criteria will be created that is later used
- in parallel behavior tree.
- """
- criteria = []
- collision_criterion = CollisionTest(self.ego_vehicles[0])
- criteria.append(collision_criterion)
- return criteria
- def __del__(self):
- """
- Remove all actors upon deletion
- """
- self.remove_all_actors()
|