#!/usr/bin/env python # Copyright (c) 2019-2020 Intel Corporation # # This work is licensed under the terms of the MIT license. # For a copy, see . """ Cut in scenario: The scenario realizes a driving behavior on the highway. The user-controlled ego vehicle is driving straight and keeping its velocity at a constant level. Another car is cutting just in front, coming from left or right lane. The ego vehicle may need to brake to avoid a collision. """ import random import py_trees import carla from srunner.scenariomanager.carla_data_provider import CarlaDataProvider from srunner.scenariomanager.scenarioatomics.atomic_behaviors import (ActorTransformSetter, LaneChange, WaypointFollower, AccelerateToCatchUp) from srunner.scenariomanager.scenarioatomics.atomic_criteria import CollisionTest from srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions import InTriggerDistanceToVehicle, DriveDistance from srunner.scenarios.basic_scenario import BasicScenario class CutIn(BasicScenario): """ The ego vehicle is driving on a highway and another car is cutting in just in front. This is a single ego vehicle scenario """ timeout = 1200 def __init__(self, world, ego_vehicles, config, randomize=False, debug_mode=False, criteria_enable=True, timeout=600): self.timeout = timeout self._map = CarlaDataProvider.get_map() self._reference_waypoint = self._map.get_waypoint(config.trigger_points[0].location) self._velocity = 40 self._delta_velocity = 10 self._trigger_distance = 30 # get direction from config name self._config = config self._direction = None self._transform_visible = None super(CutIn, self).__init__("CutIn", ego_vehicles, config, world, debug_mode, criteria_enable=criteria_enable) if randomize: self._velocity = random.randint(20, 60) self._trigger_distance = random.randint(10, 40) def _initialize_actors(self, config): # direction of lane, on which other_actor is driving before lane change if 'LEFT' in self._config.name.upper(): self._direction = 'left' if 'RIGHT' in self._config.name.upper(): self._direction = 'right' # add actors from xml file for actor in config.other_actors: vehicle = CarlaDataProvider.request_new_actor(actor.model, actor.transform) self.other_actors.append(vehicle) vehicle.set_simulate_physics(enabled=False) # transform visible other_actor_transform = self.other_actors[0].get_transform() self._transform_visible = carla.Transform( carla.Location(other_actor_transform.location.x, other_actor_transform.location.y, other_actor_transform.location.z + 105), other_actor_transform.rotation) def _create_behavior(self): """ Order of sequence: - car_visible: spawn car at a visible transform - just_drive: drive until in trigger distance to ego_vehicle - accelerate: accelerate to catch up distance to ego_vehicle - lane_change: change the lane - endcondition: drive for a defined distance """ # car_visible behaviour = py_trees.composites.Sequence("CarOn_{}_Lane" .format(self._direction)) car_visible = ActorTransformSetter(self.other_actors[0], self._transform_visible) behaviour.add_child(car_visible) # just_drive just_drive = py_trees.composites.Parallel( "DrivingStraight", policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE) car_driving = WaypointFollower(self.other_actors[0], self._velocity) just_drive.add_child(car_driving) trigger_distance = InTriggerDistanceToVehicle( self.other_actors[0], self.ego_vehicles[0], self._trigger_distance) just_drive.add_child(trigger_distance) behaviour.add_child(just_drive) # accelerate accelerate = AccelerateToCatchUp(self.other_actors[0], self.ego_vehicles[0], throttle_value=1, delta_velocity=self._delta_velocity, trigger_distance=5, max_distance=500) behaviour.add_child(accelerate) # lane_change if self._direction == 'left': lane_change = LaneChange( self.other_actors[0], speed=None, direction='right', distance_same_lane=5, distance_other_lane=300) behaviour.add_child(lane_change) else: lane_change = LaneChange( self.other_actors[0], speed=None, direction='left', distance_same_lane=5, distance_other_lane=300) behaviour.add_child(lane_change) # endcondition endcondition = DriveDistance(self.other_actors[0], 200) # build tree root = py_trees.composites.Sequence("Behavior", policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE) root.add_child(behaviour) root.add_child(endcondition) return root def _create_test_criteria(self): """ A list of all test criteria is created, which is later used in the parallel behavior tree. """ criteria = [] collision_criterion = CollisionTest(self.ego_vehicles[0]) criteria.append(collision_criterion) return criteria def __del__(self): """ Remove all actors after deletion. """ self.remove_all_actors()