#!/usr/bin/env python # Copyright (c) 2018-2020 Intel Corporation # # This work is licensed under the terms of the MIT license. # For a copy, see . """ This module provides all atomic evaluation criteria required to analyze if a scenario was completed successfully or failed. Criteria should run continuously to monitor the state of a single actor, multiple actors or environmental parameters. Hence, a termination is not required. The atomic criteria are implemented with py_trees. """ import weakref import math import numpy as np import py_trees import shapely.geometry import carla from srunner.scenariomanager.carla_data_provider import CarlaDataProvider from srunner.scenariomanager.timer import GameTime from srunner.scenariomanager.traffic_events import TrafficEvent, TrafficEventType class Criterion(py_trees.behaviour.Behaviour): """ Base class for all criteria used to evaluate a scenario for success/failure Important parameters (PUBLIC): - name: Name of the criterion - expected_value_success: Result in case of success (e.g. max_speed, zero collisions, ...) - expected_value_acceptable: Result that does not mean a failure, but is not good enough for a success - actual_value: Actual result after running the scenario - test_status: Used to access the result of the criterion - optional: Indicates if a criterion is optional (not used for overall analysis) """ def __init__(self, name, actor, expected_value_success, expected_value_acceptable=None, optional=False, terminate_on_failure=False): super(Criterion, self).__init__(name) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._terminate_on_failure = terminate_on_failure self.name = name self.actor = actor self.test_status = "INIT" self.expected_value_success = expected_value_success self.expected_value_acceptable = expected_value_acceptable self.actual_value = 0 self.optional = optional self.list_traffic_events = [] def initialise(self): """ Initialise the criterion. Can be extended by the user-derived class """ self.logger.debug("%s.initialise()" % (self.__class__.__name__)) def terminate(self, new_status): """ Terminate the criterion. Can be extended by the user-derived class """ if self.test_status in ('RUNNING', 'INIT'): self.test_status = "SUCCESS" self.logger.debug("%s.terminate()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) class MaxVelocityTest(Criterion): """ This class contains an atomic test for maximum velocity. Important parameters: - actor: CARLA actor to be used for this test - max_velocity_allowed: maximum allowed velocity in m/s - optional [optional]: If True, the result is not considered for an overall pass/fail result """ def __init__(self, actor, max_velocity_allowed, optional=False, name="CheckMaximumVelocity"): """ Setup actor and maximum allowed velovity """ super(MaxVelocityTest, self).__init__(name, actor, max_velocity_allowed, None, optional) def update(self): """ Check velocity """ new_status = py_trees.common.Status.RUNNING if self.actor is None: return new_status velocity = CarlaDataProvider.get_velocity(self.actor) self.actual_value = max(velocity, self.actual_value) if velocity > self.expected_value_success: self.test_status = "FAILURE" else: self.test_status = "SUCCESS" if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status class DrivenDistanceTest(Criterion): """ This class contains an atomic test to check the driven distance Important parameters: - actor: CARLA actor to be used for this test - distance_success: If the actor's driven distance is more than this value (in meters), the test result is SUCCESS - distance_acceptable: If the actor's driven distance is more than this value (in meters), the test result is ACCEPTABLE - optional [optional]: If True, the result is not considered for an overall pass/fail result """ def __init__(self, actor, distance_success, distance_acceptable=None, optional=False, name="CheckDrivenDistance"): """ Setup actor """ super(DrivenDistanceTest, self).__init__(name, actor, distance_success, distance_acceptable, optional) self._last_location = None def initialise(self): self._last_location = CarlaDataProvider.get_location(self.actor) super(DrivenDistanceTest, self).initialise() def update(self): """ Check distance """ new_status = py_trees.common.Status.RUNNING if self.actor is None: return new_status location = CarlaDataProvider.get_location(self.actor) if location is None: return new_status if self._last_location is None: self._last_location = location return new_status self.actual_value += location.distance(self._last_location) self._last_location = location if self.actual_value > self.expected_value_success: self.test_status = "SUCCESS" elif (self.expected_value_acceptable is not None and self.actual_value > self.expected_value_acceptable): self.test_status = "ACCEPTABLE" else: self.test_status = "RUNNING" if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def terminate(self, new_status): """ Set final status """ if self.test_status != "SUCCESS": self.test_status = "FAILURE" self.actual_value = round(self.actual_value, 2) super(DrivenDistanceTest, self).terminate(new_status) class AverageVelocityTest(Criterion): """ This class contains an atomic test for average velocity. Important parameters: - actor: CARLA actor to be used for this test - avg_velocity_success: If the actor's average velocity is more than this value (in m/s), the test result is SUCCESS - avg_velocity_acceptable: If the actor's average velocity is more than this value (in m/s), the test result is ACCEPTABLE - optional [optional]: If True, the result is not considered for an overall pass/fail result """ def __init__(self, actor, avg_velocity_success, avg_velocity_acceptable=None, optional=False, name="CheckAverageVelocity"): """ Setup actor and average velovity expected """ super(AverageVelocityTest, self).__init__(name, actor, avg_velocity_success, avg_velocity_acceptable, optional) self._last_location = None self._distance = 0.0 def initialise(self): self._last_location = CarlaDataProvider.get_location(self.actor) super(AverageVelocityTest, self).initialise() def update(self): """ Check velocity """ new_status = py_trees.common.Status.RUNNING if self.actor is None: return new_status location = CarlaDataProvider.get_location(self.actor) if location is None: return new_status if self._last_location is None: self._last_location = location return new_status self._distance += location.distance(self._last_location) self._last_location = location elapsed_time = GameTime.get_time() if elapsed_time > 0.0: self.actual_value = self._distance / elapsed_time if self.actual_value > self.expected_value_success: self.test_status = "SUCCESS" elif (self.expected_value_acceptable is not None and self.actual_value > self.expected_value_acceptable): self.test_status = "ACCEPTABLE" else: self.test_status = "RUNNING" if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def terminate(self, new_status): """ Set final status """ if self.test_status == "RUNNING": self.test_status = "FAILURE" super(AverageVelocityTest, self).terminate(new_status) class CollisionTest(Criterion): """ This class contains an atomic test for collisions. Args: - actor (carla.Actor): CARLA actor to be used for this test - other_actor (carla.Actor): only collisions with this actor will be registered - other_actor_type (str): only collisions with actors including this type_id will count. Additionally, the "miscellaneous" tag can also be used to include all static objects in the scene - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test - optional [optional]: If True, the result is not considered for an overall pass/fail result """ MIN_AREA_OF_COLLISION = 3 # If closer than this distance, the collision is ignored MAX_AREA_OF_COLLISION = 5 # If further than this distance, the area is forgotten MAX_ID_TIME = 5 # Amount of time the last collision if is remembered def __init__(self, actor, other_actor=None, other_actor_type=None, optional=False, name="CollisionTest", terminate_on_failure=False): """ Construction with sensor setup """ super(CollisionTest, self).__init__(name, actor, 0, None, optional, terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) world = self.actor.get_world() blueprint = world.get_blueprint_library().find('sensor.other.collision') self._collision_sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self.actor) self._collision_sensor.listen(lambda event: self._count_collisions(weakref.ref(self), event)) self.other_actor = other_actor self.other_actor_type = other_actor_type self.registered_collisions = [] self.last_id = None self.collision_time = None def update(self): """ Check collision count """ new_status = py_trees.common.Status.RUNNING if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE actor_location = CarlaDataProvider.get_location(self.actor) new_registered_collisions = [] # Loops through all the previous registered collisions for collision_location in self.registered_collisions: # Get the distance to the collision point distance_vector = actor_location - collision_location distance = math.sqrt(math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2)) # If far away from a previous collision, forget it if distance <= self.MAX_AREA_OF_COLLISION: new_registered_collisions.append(collision_location) self.registered_collisions = new_registered_collisions if self.last_id and GameTime.get_time() - self.collision_time > self.MAX_ID_TIME: self.last_id = None self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def terminate(self, new_status): """ Cleanup sensor """ if self._collision_sensor is not None: self._collision_sensor.destroy() self._collision_sensor = None super(CollisionTest, self).terminate(new_status) @staticmethod def _count_collisions(weak_self, event): # pylint: disable=too-many-return-statements """ Callback to update collision count """ self = weak_self() if not self: return actor_location = CarlaDataProvider.get_location(self.actor) # Ignore the current one if it is the same id as before if self.last_id == event.other_actor.id: return # Filter to only a specific actor if self.other_actor and self.other_actor.id != event.other_actor.id: return # Filter to only a specific type if self.other_actor_type: if self.other_actor_type == "miscellaneous": if "traffic" not in event.other_actor.type_id \ and "static" not in event.other_actor.type_id: return else: if self.other_actor_type not in event.other_actor.type_id: return # Ignore it if its too close to a previous collision (avoid micro collisions) for collision_location in self.registered_collisions: distance_vector = actor_location - collision_location distance = math.sqrt(math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2)) if distance <= self.MIN_AREA_OF_COLLISION: return if ('static' in event.other_actor.type_id or 'traffic' in event.other_actor.type_id) \ and 'sidewalk' not in event.other_actor.type_id: actor_type = TrafficEventType.COLLISION_STATIC elif 'vehicle' in event.other_actor.type_id: actor_type = TrafficEventType.COLLISION_VEHICLE elif 'walker' in event.other_actor.type_id: actor_type = TrafficEventType.COLLISION_PEDESTRIAN else: return collision_event = TrafficEvent(event_type=actor_type) collision_event.set_dict({ 'type': event.other_actor.type_id, 'id': event.other_actor.id, 'x': actor_location.x, 'y': actor_location.y, 'z': actor_location.z}) collision_event.set_message( "Agent collided against object with type={} and id={} at (x={}, y={}, z={})".format( event.other_actor.type_id, event.other_actor.id, round(actor_location.x, 3), round(actor_location.y, 3), round(actor_location.z, 3))) self.test_status = "FAILURE" self.actual_value += 1 self.collision_time = GameTime.get_time() self.registered_collisions.append(actor_location) self.list_traffic_events.append(collision_event) # Number 0: static objects -> ignore it if event.other_actor.id != 0: self.last_id = event.other_actor.id class ActorSpeedAboveThresholdTest(Criterion): """ This test will fail if the actor has had its linear velocity lower than a specific value for a specific amount of time Important parameters: - actor: CARLA actor to be used for this test - speed_threshold: speed required - below_threshold_max_time: Maximum time (in seconds) the actor can remain under the speed threshold - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ def __init__(self, actor, speed_threshold, below_threshold_max_time, name="ActorSpeedAboveThresholdTest", terminate_on_failure=False): """ Class constructor. """ super(ActorSpeedAboveThresholdTest, self).__init__(name, actor, 0, terminate_on_failure=terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._speed_threshold = speed_threshold self._below_threshold_max_time = below_threshold_max_time self._time_last_valid_state = None def update(self): """ Check if the actor speed is above the speed_threshold """ new_status = py_trees.common.Status.RUNNING linear_speed = CarlaDataProvider.get_velocity(self._actor) if linear_speed is not None: if linear_speed < self._speed_threshold and self._time_last_valid_state: if (GameTime.get_time() - self._time_last_valid_state) > self._below_threshold_max_time: # Game over. The actor has been "blocked" for too long self.test_status = "FAILURE" # record event vehicle_location = CarlaDataProvider.get_location(self._actor) blocked_event = TrafficEvent(event_type=TrafficEventType.VEHICLE_BLOCKED) ActorSpeedAboveThresholdTest._set_event_message(blocked_event, vehicle_location) ActorSpeedAboveThresholdTest._set_event_dict(blocked_event, vehicle_location) self.list_traffic_events.append(blocked_event) else: self._time_last_valid_state = GameTime.get_time() if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status @staticmethod def _set_event_message(event, location): """ Sets the message of the event """ event.set_message('Agent got blocked at (x={}, y={}, z={})'.format(round(location.x, 3), round(location.y, 3), round(location.z, 3))) @staticmethod def _set_event_dict(event, location): """ Sets the dictionary of the event """ event.set_dict({ 'x': location.x, 'y': location.y, 'z': location.z, }) class KeepLaneTest(Criterion): """ This class contains an atomic test for keeping lane. Important parameters: - actor: CARLA actor to be used for this test - optional [optional]: If True, the result is not considered for an overall pass/fail result """ def __init__(self, actor, optional=False, name="CheckKeepLane"): """ Construction with sensor setup """ super(KeepLaneTest, self).__init__(name, actor, 0, None, optional) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) world = self.actor.get_world() blueprint = world.get_blueprint_library().find('sensor.other.lane_invasion') self._lane_sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self.actor) self._lane_sensor.listen(lambda event: self._count_lane_invasion(weakref.ref(self), event)) def update(self): """ Check lane invasion count """ new_status = py_trees.common.Status.RUNNING if self.actual_value > 0: self.test_status = "FAILURE" else: self.test_status = "SUCCESS" if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def terminate(self, new_status): """ Cleanup sensor """ if self._lane_sensor is not None: self._lane_sensor.destroy() self._lane_sensor = None super(KeepLaneTest, self).terminate(new_status) @staticmethod def _count_lane_invasion(weak_self, event): """ Callback to update lane invasion count """ self = weak_self() if not self: return self.actual_value += 1 class ReachedRegionTest(Criterion): """ This class contains the reached region test The test is a success if the actor reaches a specified region Important parameters: - actor: CARLA actor to be used for this test - min_x, max_x, min_y, max_y: Bounding box of the checked region """ def __init__(self, actor, min_x, max_x, min_y, max_y, name="ReachedRegionTest"): """ Setup trigger region (rectangle provided by [min_x,min_y] and [max_x,max_y] """ super(ReachedRegionTest, self).__init__(name, actor, 0) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._min_x = min_x self._max_x = max_x self._min_y = min_y self._max_y = max_y def update(self): """ Check if the actor location is within trigger region """ new_status = py_trees.common.Status.RUNNING location = CarlaDataProvider.get_location(self._actor) if location is None: return new_status in_region = False if self.test_status != "SUCCESS": in_region = (location.x > self._min_x and location.x < self._max_x) and ( location.y > self._min_y and location.y < self._max_y) if in_region: self.test_status = "SUCCESS" else: self.test_status = "RUNNING" if self.test_status == "SUCCESS": new_status = py_trees.common.Status.SUCCESS self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status class OffRoadTest(Criterion): """ Atomic containing a test to detect when an actor deviates from the driving lanes. This atomic can fail when actor has spent a specific time outside driving lanes (defined by OpenDRIVE). Simplified version of OnSidewalkTest, and doesn't relly on waypoints with *Sidewalk* lane types Args: actor (carla.Actor): CARLA actor to be used for this test duration (float): Time spent at sidewalks before the atomic fails. If terminate_on_failure isn't active, this is ignored. optional (bool): If True, the result is not considered for an overall pass/fail result when using the output argument terminate_on_failure (bool): If True, the atomic will fail when the duration condition has been met. """ def __init__(self, actor, duration=0, optional=False, terminate_on_failure=False, name="OffRoadTest"): """ Setup of the variables """ super(OffRoadTest, self).__init__(name, actor, 0, None, optional, terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._map = CarlaDataProvider.get_map() self._offroad = False self._duration = duration self._prev_time = None self._time_offroad = 0 def update(self): """ First, transforms the actor's current position to its corresponding waypoint. This is filtered to only use waypoints of type Driving or Parking. Depending on these results, the actor will be considered to be outside (or inside) driving lanes. returns: py_trees.common.Status.FAILURE: when the actor has spent a given duration outside driving lanes py_trees.common.Status.RUNNING: the rest of the time """ new_status = py_trees.common.Status.RUNNING current_location = CarlaDataProvider.get_location(self.actor) # Get the waypoint at the current location to see if the actor is offroad drive_waypoint = self._map.get_waypoint( current_location, project_to_road=False ) park_waypoint = self._map.get_waypoint( current_location, project_to_road=False, lane_type=carla.LaneType.Parking ) if drive_waypoint or park_waypoint: self._offroad = False else: self._offroad = True # Counts the time offroad if self._offroad: if self._prev_time is None: self._prev_time = GameTime.get_time() else: curr_time = GameTime.get_time() self._time_offroad += curr_time - self._prev_time self._prev_time = curr_time else: self._prev_time = None if self._time_offroad > self._duration: self.test_status = "FAILURE" if self._terminate_on_failure and self.test_status == "FAILURE": new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status class EndofRoadTest(Criterion): """ Atomic containing a test to detect when an actor has changed to a different road Args: actor (carla.Actor): CARLA actor to be used for this test duration (float): Time spent after ending the road before the atomic fails. If terminate_on_failure isn't active, this is ignored. optional (bool): If True, the result is not considered for an overall pass/fail result when using the output argument terminate_on_failure (bool): If True, the atomic will fail when the duration condition has been met. """ def __init__(self, actor, duration=0, optional=False, terminate_on_failure=False, name="EndofRoadTest"): """ Setup of the variables """ super(EndofRoadTest, self).__init__(name, actor, 0, None, optional, terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._map = CarlaDataProvider.get_map() self._end_of_road = False self._duration = duration self._start_time = None self._time_end_road = 0 self._road_id = None def update(self): """ First, transforms the actor's current position to its corresponding waypoint. Then the road id is compared with the initial one and if that's the case, a time is started returns: py_trees.common.Status.FAILURE: when the actor has spent a given duration outside driving lanes py_trees.common.Status.RUNNING: the rest of the time """ new_status = py_trees.common.Status.RUNNING current_location = CarlaDataProvider.get_location(self.actor) current_waypoint = self._map.get_waypoint(current_location) # Get the current road id if self._road_id is None: self._road_id = current_waypoint.road_id else: # Wait until the actor has left the road if self._road_id != current_waypoint.road_id or self._start_time: # Start counting if self._start_time is None: self._start_time = GameTime.get_time() return new_status curr_time = GameTime.get_time() self._time_end_road = curr_time - self._start_time if self._time_end_road > self._duration: self.test_status = "FAILURE" self.actual_value += 1 return py_trees.common.Status.SUCCESS self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status class OnSidewalkTest(Criterion): """ Atomic containing a test to detect sidewalk invasions of a specific actor. This atomic can fail when actor has spent a specific time outside driving lanes (defined by OpenDRIVE). Args: actor (carla.Actor): CARLA actor to be used for this test duration (float): Time spent at sidewalks before the atomic fails. If terminate_on_failure isn't active, this is ignored. optional (bool): If True, the result is not considered for an overall pass/fail result when using the output argument terminate_on_failure (bool): If True, the atomic will fail when the duration condition has been met. """ def __init__(self, actor, duration=0, optional=False, terminate_on_failure=False, name="OnSidewalkTest"): """ Construction with sensor setup """ super(OnSidewalkTest, self).__init__(name, actor, 0, None, optional, terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._map = CarlaDataProvider.get_map() self._onsidewalk_active = False self._outside_lane_active = False self._actor_location = self._actor.get_location() self._wrong_sidewalk_distance = 0 self._wrong_outside_lane_distance = 0 self._sidewalk_start_location = None self._outside_lane_start_location = None self._duration = duration self._prev_time = None self._time_outside_lanes = 0 def update(self): """ First, transforms the actor's current position as well as its four corners to their corresponding waypoints. Depending on their lane type, the actor will be considered to be outside (or inside) driving lanes. returns: py_trees.common.Status.FAILURE: when the actor has spent a given duration outside driving lanes and terminate_on_failure is active py_trees.common.Status.RUNNING: the rest of the time """ new_status = py_trees.common.Status.RUNNING if self._terminate_on_failure and self.test_status == "FAILURE": new_status = py_trees.common.Status.FAILURE # Some of the vehicle parameters current_tra = CarlaDataProvider.get_transform(self._actor) current_loc = current_tra.location current_wp = self._map.get_waypoint(current_loc, lane_type=carla.LaneType.Any) # Case 1) Car center is at a sidewalk if current_wp.lane_type == carla.LaneType.Sidewalk: if not self._onsidewalk_active: self._onsidewalk_active = True self._sidewalk_start_location = current_loc # Case 2) Not inside allowed zones (Driving and Parking) elif current_wp.lane_type not in (carla.LaneType.Driving, carla.LaneType.Parking): # Get the vertices of the vehicle heading_vec = current_tra.get_forward_vector() heading_vec.z = 0 heading_vec = heading_vec / math.sqrt(math.pow(heading_vec.x, 2) + math.pow(heading_vec.y, 2)) perpendicular_vec = carla.Vector3D(-heading_vec.y, heading_vec.x, 0) extent = self.actor.bounding_box.extent x_boundary_vector = heading_vec * extent.x y_boundary_vector = perpendicular_vec * extent.y bbox = [ current_loc + carla.Location(x_boundary_vector - y_boundary_vector), current_loc + carla.Location(x_boundary_vector + y_boundary_vector), current_loc + carla.Location(-1 * x_boundary_vector - y_boundary_vector), current_loc + carla.Location(-1 * x_boundary_vector + y_boundary_vector)] bbox_wp = [ self._map.get_waypoint(bbox[0], lane_type=carla.LaneType.Any), self._map.get_waypoint(bbox[1], lane_type=carla.LaneType.Any), self._map.get_waypoint(bbox[2], lane_type=carla.LaneType.Any), self._map.get_waypoint(bbox[3], lane_type=carla.LaneType.Any)] lane_type_list = [bbox_wp[0].lane_type, bbox_wp[1].lane_type, bbox_wp[2].lane_type, bbox_wp[3].lane_type] # Case 2.1) Not quite outside yet if bbox_wp[0].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking) \ or bbox_wp[1].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking) \ or bbox_wp[2].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking) \ or bbox_wp[3].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking): self._onsidewalk_active = False self._outside_lane_active = False # Case 2.2) At the mini Shoulders between Driving and Sidewalk elif carla.LaneType.Sidewalk in lane_type_list: if not self._onsidewalk_active: self._onsidewalk_active = True self._sidewalk_start_location = current_loc else: distance_vehicle_wp = current_loc.distance(current_wp.transform.location) # Case 2.3) Outside lane if distance_vehicle_wp >= current_wp.lane_width / 2: if not self._outside_lane_active: self._outside_lane_active = True self._outside_lane_start_location = current_loc # Case 2.4) Very very edge case (but still inside driving lanes) else: self._onsidewalk_active = False self._outside_lane_active = False # Case 3) Driving and Parking conditions else: # Check for false positives at junctions if current_wp.is_junction: distance_vehicle_wp = math.sqrt( math.pow(current_wp.transform.location.x - current_loc.x, 2) + math.pow(current_wp.transform.location.y - current_loc.y, 2)) if distance_vehicle_wp <= current_wp.lane_width / 2: self._onsidewalk_active = False self._outside_lane_active = False # Else, do nothing, the waypoint is too far to consider it a correct position else: self._onsidewalk_active = False self._outside_lane_active = False # Counts the time offroad if self._onsidewalk_active or self._outside_lane_active: if self._prev_time is None: self._prev_time = GameTime.get_time() else: curr_time = GameTime.get_time() self._time_outside_lanes += curr_time - self._prev_time self._prev_time = curr_time else: self._prev_time = None if self._time_outside_lanes > self._duration: self.test_status = "FAILURE" # Update the distances distance_vector = CarlaDataProvider.get_location(self._actor) - self._actor_location distance = math.sqrt(math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2)) if distance >= 0.02: # Used to avoid micro-changes adding to considerable sums self._actor_location = CarlaDataProvider.get_location(self._actor) if self._onsidewalk_active: self._wrong_sidewalk_distance += distance elif self._outside_lane_active: # Only add if car is outside the lane but ISN'T in a junction self._wrong_outside_lane_distance += distance # Register the sidewalk event if not self._onsidewalk_active and self._wrong_sidewalk_distance > 0: self.actual_value += 1 onsidewalk_event = TrafficEvent(event_type=TrafficEventType.ON_SIDEWALK_INFRACTION) self._set_event_message( onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) self._set_event_dict( onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) self._onsidewalk_active = False self._wrong_sidewalk_distance = 0 self.list_traffic_events.append(onsidewalk_event) # Register the outside of a lane event if not self._outside_lane_active and self._wrong_outside_lane_distance > 0: self.actual_value += 1 outsidelane_event = TrafficEvent(event_type=TrafficEventType.OUTSIDE_LANE_INFRACTION) self._set_event_message( outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) self._set_event_dict( outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) self._outside_lane_active = False self._wrong_outside_lane_distance = 0 self.list_traffic_events.append(outsidelane_event) self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def terminate(self, new_status): """ If there is currently an event running, it is registered """ # If currently at a sidewalk, register the event if self._onsidewalk_active: self.actual_value += 1 onsidewalk_event = TrafficEvent(event_type=TrafficEventType.ON_SIDEWALK_INFRACTION) self._set_event_message( onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) self._set_event_dict( onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) self._onsidewalk_active = False self._wrong_sidewalk_distance = 0 self.list_traffic_events.append(onsidewalk_event) # If currently outside of our lane, register the event if self._outside_lane_active: self.actual_value += 1 outsidelane_event = TrafficEvent(event_type=TrafficEventType.OUTSIDE_LANE_INFRACTION) self._set_event_message( outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) self._set_event_dict( outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) self._outside_lane_active = False self._wrong_outside_lane_distance = 0 self.list_traffic_events.append(outsidelane_event) super(OnSidewalkTest, self).terminate(new_status) def _set_event_message(self, event, location, distance): """ Sets the message of the event """ if event.get_type() == TrafficEventType.ON_SIDEWALK_INFRACTION: message_start = 'Agent invaded the sidewalk' else: message_start = 'Agent went outside the lane' event.set_message( '{} for about {} meters, starting at (x={}, y={}, z={})'.format( message_start, round(distance, 3), round(location.x, 3), round(location.y, 3), round(location.z, 3))) def _set_event_dict(self, event, location, distance): """ Sets the dictionary of the event """ event.set_dict({ 'x': location.x, 'y': location.y, 'z': location.z, 'distance': distance}) class OutsideRouteLanesTest(Criterion): """ Atomic to detect if the vehicle is either on a sidewalk or at a wrong lane. The distance spent outside is computed and it is returned as a percentage of the route distance traveled. Args: actor (carla.ACtor): CARLA actor to be used for this test route (list [carla.Location, connection]): series of locations representing the route waypoints optional (bool): If True, the result is not considered for an overall pass/fail result """ ALLOWED_OUT_DISTANCE = 1.3 # At least 0.5, due to the mini-shoulder between lanes and sidewalks MAX_ALLOWED_VEHICLE_ANGLE = 120.0 # Maximum angle between the yaw and waypoint lane MAX_ALLOWED_WAYPOINT_ANGLE = 150.0 # Maximum change between the yaw-lane angle between frames WINDOWS_SIZE = 3 # Amount of additional waypoints checked (in case the first on fails) def __init__(self, actor, route, optional=False, name="OutsideRouteLanesTest"): """ Constructor """ super(OutsideRouteLanesTest, self).__init__(name, actor, 0, None, optional) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._route = route self._current_index = 0 self._route_length = len(self._route) self._waypoints, _ = zip(*self._route) self._map = CarlaDataProvider.get_map() self._pre_ego_waypoint = self._map.get_waypoint(self._actor.get_location()) self._outside_lane_active = False self._wrong_lane_active = False self._last_road_id = None self._last_lane_id = None self._total_distance = 0 self._wrong_distance = 0 def update(self): """ Transforms the actor location and its four corners to waypoints. Depending on its types, the actor will be considered to be at driving lanes, sidewalk or offroad. returns: py_trees.common.Status.FAILURE: when the actor has left driving and terminate_on_failure is active py_trees.common.Status.RUNNING: the rest of the time """ new_status = py_trees.common.Status.RUNNING if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE # Some of the vehicle parameters location = CarlaDataProvider.get_location(self._actor) if location is None: return new_status # 1) Check if outside route lanes self._is_outside_driving_lanes(location) self._is_at_wrong_lane(location) if self._outside_lane_active or self._wrong_lane_active: self.test_status = "FAILURE" # 2) Get the traveled distance for index in range(self._current_index + 1, min(self._current_index + self.WINDOWS_SIZE + 1, self._route_length)): # Get the dot product to know if it has passed this location index_location = self._waypoints[index] index_waypoint = self._map.get_waypoint(index_location) wp_dir = index_waypoint.transform.get_forward_vector() # Waypoint's forward vector wp_veh = location - index_location # vector waypoint - vehicle dot_ve_wp = wp_veh.x * wp_dir.x + wp_veh.y * wp_dir.y + wp_veh.z * wp_dir.z if dot_ve_wp > 0: # Get the distance traveled index_location = self._waypoints[index] current_index_location = self._waypoints[self._current_index] new_dist = current_index_location.distance(index_location) # Add it to the total distance self._current_index = index self._total_distance += new_dist # And to the wrong one if outside route lanes if self._outside_lane_active or self._wrong_lane_active: self._wrong_distance += new_dist self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def _is_outside_driving_lanes(self, location): """ Detects if the ego_vehicle is outside driving lanes """ current_driving_wp = self._map.get_waypoint(location, lane_type=carla.LaneType.Driving, project_to_road=True) current_parking_wp = self._map.get_waypoint(location, lane_type=carla.LaneType.Parking, project_to_road=True) driving_distance = location.distance(current_driving_wp.transform.location) if current_parking_wp is not None: # Some towns have no parking parking_distance = location.distance(current_parking_wp.transform.location) else: parking_distance = float('inf') if driving_distance >= parking_distance: distance = parking_distance lane_width = current_parking_wp.lane_width else: distance = driving_distance lane_width = current_driving_wp.lane_width self._outside_lane_active = bool(distance > (lane_width / 2 + self.ALLOWED_OUT_DISTANCE)) def _is_at_wrong_lane(self, location): """ Detects if the ego_vehicle has invaded a wrong lane """ current_waypoint = self._map.get_waypoint(location, lane_type=carla.LaneType.Driving, project_to_road=True) current_lane_id = current_waypoint.lane_id current_road_id = current_waypoint.road_id # Lanes and roads are too chaotic at junctions if current_waypoint.is_junction: self._wrong_lane_active = False elif self._last_road_id != current_road_id or self._last_lane_id != current_lane_id: # Route direction can be considered continuous, except after exiting a junction. if self._pre_ego_waypoint.is_junction: yaw_waypt = current_waypoint.transform.rotation.yaw % 360 yaw_actor = self._actor.get_transform().rotation.yaw % 360 vehicle_lane_angle = (yaw_waypt - yaw_actor) % 360 if vehicle_lane_angle < self.MAX_ALLOWED_VEHICLE_ANGLE \ or vehicle_lane_angle > (360 - self.MAX_ALLOWED_VEHICLE_ANGLE): self._wrong_lane_active = False else: self._wrong_lane_active = True else: # Check for a big gap in waypoint directions. yaw_pre_wp = self._pre_ego_waypoint.transform.rotation.yaw % 360 yaw_cur_wp = current_waypoint.transform.rotation.yaw % 360 waypoint_angle = (yaw_pre_wp - yaw_cur_wp) % 360 if waypoint_angle >= self.MAX_ALLOWED_WAYPOINT_ANGLE \ and waypoint_angle <= (360 - self.MAX_ALLOWED_WAYPOINT_ANGLE): # pylint: disable=chained-comparison # Is the ego vehicle going back to the lane, or going out? Take the opposite self._wrong_lane_active = not bool(self._wrong_lane_active) else: # Changing to a lane with the same direction self._wrong_lane_active = False # Remember the last state self._last_lane_id = current_lane_id self._last_road_id = current_road_id self._pre_ego_waypoint = current_waypoint def terminate(self, new_status): """ If there is currently an event running, it is registered """ if self._wrong_distance > 0: percentage = self._wrong_distance / self._total_distance * 100 outside_lane = TrafficEvent(event_type=TrafficEventType.OUTSIDE_ROUTE_LANES_INFRACTION) outside_lane.set_message( "Agent went outside its route lanes for about {} meters " "({}% of the completed route)".format( round(self._wrong_distance, 3), round(percentage, 2))) outside_lane.set_dict({ 'distance': self._wrong_distance, 'percentage': percentage }) self._wrong_distance = 0 self.list_traffic_events.append(outside_lane) self.actual_value = round(percentage, 2) super(OutsideRouteLanesTest, self).terminate(new_status) class WrongLaneTest(Criterion): """ This class contains an atomic test to detect invasions to wrong direction lanes. Important parameters: - actor: CARLA actor to be used for this test - optional [optional]: If True, the result is not considered for an overall pass/fail result """ MAX_ALLOWED_ANGLE = 120.0 MAX_ALLOWED_WAYPOINT_ANGLE = 150.0 def __init__(self, actor, optional=False, name="WrongLaneTest"): """ Construction with sensor setup """ super(WrongLaneTest, self).__init__(name, actor, 0, None, optional) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._map = CarlaDataProvider.get_map() self._last_lane_id = None self._last_road_id = None self._in_lane = True self._wrong_distance = 0 self._actor_location = self._actor.get_location() self._previous_lane_waypoint = self._map.get_waypoint(self._actor.get_location()) self._wrong_lane_start_location = None def update(self): """ Check lane invasion count """ new_status = py_trees.common.Status.RUNNING if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE lane_waypoint = self._map.get_waypoint(self._actor.get_location()) current_lane_id = lane_waypoint.lane_id current_road_id = lane_waypoint.road_id if (self._last_road_id != current_road_id or self._last_lane_id != current_lane_id) \ and not lane_waypoint.is_junction: next_waypoint = lane_waypoint.next(2.0)[0] if not next_waypoint: return new_status # The waypoint route direction can be considered continuous. # Therefore just check for a big gap in waypoint directions. previous_lane_direction = self._previous_lane_waypoint.transform.get_forward_vector() current_lane_direction = lane_waypoint.transform.get_forward_vector() p_lane_vector = np.array([previous_lane_direction.x, previous_lane_direction.y]) c_lane_vector = np.array([current_lane_direction.x, current_lane_direction.y]) waypoint_angle = math.degrees( math.acos(np.clip(np.dot(p_lane_vector, c_lane_vector) / (np.linalg.norm(p_lane_vector) * np.linalg.norm(c_lane_vector)), -1.0, 1.0))) if waypoint_angle > self.MAX_ALLOWED_WAYPOINT_ANGLE and self._in_lane: self.test_status = "FAILURE" self._in_lane = False self.actual_value += 1 self._wrong_lane_start_location = self._actor_location else: # Reset variables self._in_lane = True # Continuity is broken after a junction so check vehicle-lane angle instead if self._previous_lane_waypoint.is_junction: vector_wp = np.array([next_waypoint.transform.location.x - lane_waypoint.transform.location.x, next_waypoint.transform.location.y - lane_waypoint.transform.location.y]) vector_actor = np.array([math.cos(math.radians(self._actor.get_transform().rotation.yaw)), math.sin(math.radians(self._actor.get_transform().rotation.yaw))]) vehicle_lane_angle = math.degrees( math.acos(np.clip(np.dot(vector_actor, vector_wp) / (np.linalg.norm(vector_wp)), -1.0, 1.0))) if vehicle_lane_angle > self.MAX_ALLOWED_ANGLE: self.test_status = "FAILURE" self._in_lane = False self.actual_value += 1 self._wrong_lane_start_location = self._actor.get_location() # Keep adding "meters" to the counter distance_vector = self._actor.get_location() - self._actor_location distance = math.sqrt(math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2)) if distance >= 0.02: # Used to avoid micro-changes adding add to considerable sums self._actor_location = CarlaDataProvider.get_location(self._actor) if not self._in_lane and not lane_waypoint.is_junction: self._wrong_distance += distance # Register the event if self._in_lane and self._wrong_distance > 0: wrong_way_event = TrafficEvent(event_type=TrafficEventType.WRONG_WAY_INFRACTION) self._set_event_message(wrong_way_event, self._wrong_lane_start_location, self._wrong_distance, current_road_id, current_lane_id) self._set_event_dict(wrong_way_event, self._wrong_lane_start_location, self._wrong_distance, current_road_id, current_lane_id) self.list_traffic_events.append(wrong_way_event) self._wrong_distance = 0 # Remember the last state self._last_lane_id = current_lane_id self._last_road_id = current_road_id self._previous_lane_waypoint = lane_waypoint self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def terminate(self, new_status): """ If there is currently an event running, it is registered """ if not self._in_lane: lane_waypoint = self._map.get_waypoint(self._actor.get_location()) current_lane_id = lane_waypoint.lane_id current_road_id = lane_waypoint.road_id wrong_way_event = TrafficEvent(event_type=TrafficEventType.WRONG_WAY_INFRACTION) self._set_event_message(wrong_way_event, self._wrong_lane_start_location, self._wrong_distance, current_road_id, current_lane_id) self._set_event_dict(wrong_way_event, self._wrong_lane_start_location, self._wrong_distance, current_road_id, current_lane_id) self._wrong_distance = 0 self._in_lane = True self.list_traffic_events.append(wrong_way_event) super(WrongLaneTest, self).terminate(new_status) def _set_event_message(self, event, location, distance, road_id, lane_id): """ Sets the message of the event """ event.set_message( "Agent invaded a lane in opposite direction for {} meters, starting at (x={}, y={}, z={}). " "road_id={}, lane_id={}".format( round(distance, 3), round(location.x, 3), round(location.y, 3), round(location.z, 3), road_id, lane_id)) def _set_event_dict(self, event, location, distance, road_id, lane_id): """ Sets the dictionary of the event """ event.set_dict({ 'x': location.x, 'y': location.y, 'z': location.y, 'distance': distance, 'road_id': road_id, 'lane_id': lane_id}) class InRadiusRegionTest(Criterion): """ The test is a success if the actor is within a given radius of a specified region Important parameters: - actor: CARLA actor to be used for this test - x, y, radius: Position (x,y) and radius (in meters) used to get the checked region """ def __init__(self, actor, x, y, radius, name="InRadiusRegionTest"): """ """ super(InRadiusRegionTest, self).__init__(name, actor, 0) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._x = x # pylint: disable=invalid-name self._y = y # pylint: disable=invalid-name self._radius = radius def update(self): """ Check if the actor location is within trigger region """ new_status = py_trees.common.Status.RUNNING location = CarlaDataProvider.get_location(self._actor) if location is None: return new_status if self.test_status != "SUCCESS": in_radius = math.sqrt(((location.x - self._x)**2) + ((location.y - self._y)**2)) < self._radius if in_radius: route_completion_event = TrafficEvent(event_type=TrafficEventType.ROUTE_COMPLETED) route_completion_event.set_message("Destination was successfully reached") self.list_traffic_events.append(route_completion_event) self.test_status = "SUCCESS" else: self.test_status = "RUNNING" if self.test_status == "SUCCESS": new_status = py_trees.common.Status.SUCCESS self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status class InRouteTest(Criterion): """ The test is a success if the actor is never outside route. The actor can go outside of the route but only for a certain amount of distance Important parameters: - actor: CARLA actor to be used for this test - route: Route to be checked - offroad_max: Maximum distance (in meters) the actor can deviate from the route - offroad_min: Maximum safe distance (in meters). Might eventually cause failure - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ MAX_ROUTE_PERCENTAGE = 30 # % WINDOWS_SIZE = 5 # Amount of additional waypoints checked def __init__(self, actor, route, offroad_min=-1, offroad_max=30, name="InRouteTest", terminate_on_failure=False): """ """ super(InRouteTest, self).__init__(name, actor, 0, terminate_on_failure=terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._route = route self._offroad_max = offroad_max # Unless specified, halve of the max value if offroad_min == -1: self._offroad_min = self._offroad_max / 2 else: self._offroad_min = self._offroad_min self._world = CarlaDataProvider.get_world() self._waypoints, _ = zip(*self._route) self._route_length = len(self._route) self._current_index = 0 self._out_route_distance = 0 self._in_safe_route = True self._accum_meters = [] prev_wp = self._waypoints[0] for i, wp in enumerate(self._waypoints): d = wp.distance(prev_wp) if i > 0: accum = self._accum_meters[i - 1] else: accum = 0 self._accum_meters.append(d + accum) prev_wp = wp # Blackboard variable blackv = py_trees.blackboard.Blackboard() _ = blackv.set("InRoute", True) def update(self): """ Check if the actor location is within trigger region """ new_status = py_trees.common.Status.RUNNING location = CarlaDataProvider.get_location(self._actor) if location is None: return new_status if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE elif self.test_status in ('RUNNING', 'INIT'): off_route = True shortest_distance = float('inf') closest_index = -1 # Get the closest distance for index in range(self._current_index, min(self._current_index + self.WINDOWS_SIZE + 1, self._route_length)): ref_waypoint = self._waypoints[index] distance = math.sqrt(((location.x - ref_waypoint.x) ** 2) + ((location.y - ref_waypoint.y) ** 2)) if distance <= shortest_distance: closest_index = index shortest_distance = distance if closest_index == -1 or shortest_distance == float('inf'): return new_status # Check if the actor is out of route if shortest_distance < self._offroad_max: off_route = False self._in_safe_route = bool(shortest_distance < self._offroad_min) # If actor advanced a step, record the distance if self._current_index != closest_index: new_dist = self._accum_meters[closest_index] - self._accum_meters[self._current_index] # If too far from the route, add it and check if its value if not self._in_safe_route: self._out_route_distance += new_dist out_route_percentage = 100 * self._out_route_distance / self._accum_meters[-1] if out_route_percentage > self.MAX_ROUTE_PERCENTAGE: off_route = True self._current_index = closest_index if off_route: # Blackboard variable blackv = py_trees.blackboard.Blackboard() _ = blackv.set("InRoute", False) route_deviation_event = TrafficEvent(event_type=TrafficEventType.ROUTE_DEVIATION) route_deviation_event.set_message( "Agent deviated from the route at (x={}, y={}, z={})".format( round(location.x, 3), round(location.y, 3), round(location.z, 3))) route_deviation_event.set_dict({ 'x': location.x, 'y': location.y, 'z': location.z}) self.list_traffic_events.append(route_deviation_event) self.test_status = "FAILURE" self.actual_value += 1 new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status class RouteCompletionTest(Criterion): """ Check at which stage of the route is the actor at each tick Important parameters: - actor: CARLA actor to be used for this test - route: Route to be checked - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ DISTANCE_THRESHOLD = 10.0 # meters WINDOWS_SIZE = 2 def __init__(self, actor, route, name="RouteCompletionTest", terminate_on_failure=False): """ """ super(RouteCompletionTest, self).__init__(name, actor, 100, terminate_on_failure=terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._route = route self._map = CarlaDataProvider.get_map() self._wsize = self.WINDOWS_SIZE self._current_index = 0 self._route_length = len(self._route) self._waypoints, _ = zip(*self._route) self.target = self._waypoints[-1] self._accum_meters = [] prev_wp = self._waypoints[0] for i, wp in enumerate(self._waypoints): d = wp.distance(prev_wp) if i > 0: accum = self._accum_meters[i - 1] else: accum = 0 self._accum_meters.append(d + accum) prev_wp = wp self._traffic_event = TrafficEvent(event_type=TrafficEventType.ROUTE_COMPLETION) self.list_traffic_events.append(self._traffic_event) self._percentage_route_completed = 0.0 def update(self): """ Check if the actor location is within trigger region """ new_status = py_trees.common.Status.RUNNING location = CarlaDataProvider.get_location(self._actor) if location is None: return new_status if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE elif self.test_status in ('RUNNING', 'INIT'): for index in range(self._current_index, min(self._current_index + self._wsize + 1, self._route_length)): # Get the dot product to know if it has passed this location ref_waypoint = self._waypoints[index] wp = self._map.get_waypoint(ref_waypoint) wp_dir = wp.transform.get_forward_vector() # Waypoint's forward vector wp_veh = location - ref_waypoint # vector waypoint - vehicle dot_ve_wp = wp_veh.x * wp_dir.x + wp_veh.y * wp_dir.y + wp_veh.z * wp_dir.z if dot_ve_wp > 0: # good! segment completed! self._current_index = index self._percentage_route_completed = 100.0 * float(self._accum_meters[self._current_index]) \ / float(self._accum_meters[-1]) self._traffic_event.set_dict({ 'route_completed': self._percentage_route_completed}) self._traffic_event.set_message( "Agent has completed > {:.2f}% of the route".format( self._percentage_route_completed)) if self._percentage_route_completed > 99.0 and location.distance(self.target) < self.DISTANCE_THRESHOLD: route_completion_event = TrafficEvent(event_type=TrafficEventType.ROUTE_COMPLETED) route_completion_event.set_message("Destination was successfully reached") self.list_traffic_events.append(route_completion_event) self.test_status = "SUCCESS" self._percentage_route_completed = 100 elif self.test_status == "SUCCESS": new_status = py_trees.common.Status.SUCCESS self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def terminate(self, new_status): """ Set test status to failure if not successful and terminate """ self.actual_value = round(self._percentage_route_completed, 2) if self.test_status == "INIT": self.test_status = "FAILURE" super(RouteCompletionTest, self).terminate(new_status) class RunningRedLightTest(Criterion): """ Check if an actor is running a red light Important parameters: - actor: CARLA actor to be used for this test - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ DISTANCE_LIGHT = 15 # m def __init__(self, actor, name="RunningRedLightTest", terminate_on_failure=False): """ Init """ super(RunningRedLightTest, self).__init__(name, actor, 0, terminate_on_failure=terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._world = actor.get_world() self._map = CarlaDataProvider.get_map() self._list_traffic_lights = [] self._last_red_light_id = None self.actual_value = 0 self.debug = False all_actors = self._world.get_actors() for _actor in all_actors: if 'traffic_light' in _actor.type_id: center, waypoints = self.get_traffic_light_waypoints(_actor) self._list_traffic_lights.append((_actor, center, waypoints)) # pylint: disable=no-self-use def is_vehicle_crossing_line(self, seg1, seg2): """ check if vehicle crosses a line segment """ line1 = shapely.geometry.LineString([(seg1[0].x, seg1[0].y), (seg1[1].x, seg1[1].y)]) line2 = shapely.geometry.LineString([(seg2[0].x, seg2[0].y), (seg2[1].x, seg2[1].y)]) inter = line1.intersection(line2) return not inter.is_empty def update(self): """ Check if the actor is running a red light """ new_status = py_trees.common.Status.RUNNING transform = CarlaDataProvider.get_transform(self._actor) location = transform.location if location is None: return new_status veh_extent = self._actor.bounding_box.extent.x tail_close_pt = self.rotate_point(carla.Vector3D(-0.8 * veh_extent, 0.0, location.z), transform.rotation.yaw) tail_close_pt = location + carla.Location(tail_close_pt) tail_far_pt = self.rotate_point(carla.Vector3D(-veh_extent - 1, 0.0, location.z), transform.rotation.yaw) tail_far_pt = location + carla.Location(tail_far_pt) for traffic_light, center, waypoints in self._list_traffic_lights: if self.debug: z = 2.1 if traffic_light.state == carla.TrafficLightState.Red: color = carla.Color(155, 0, 0) elif traffic_light.state == carla.TrafficLightState.Green: color = carla.Color(0, 155, 0) else: color = carla.Color(155, 155, 0) self._world.debug.draw_point(center + carla.Location(z=z), size=0.2, color=color, life_time=0.01) for wp in waypoints: text = "{}.{}".format(wp.road_id, wp.lane_id) self._world.debug.draw_string( wp.transform.location + carla.Location(x=1, z=z), text, color=color, life_time=0.01) self._world.debug.draw_point( wp.transform.location + carla.Location(z=z), size=0.1, color=color, life_time=0.01) center_loc = carla.Location(center) if self._last_red_light_id and self._last_red_light_id == traffic_light.id: continue if center_loc.distance(location) > self.DISTANCE_LIGHT: continue if traffic_light.state != carla.TrafficLightState.Red: continue for wp in waypoints: tail_wp = self._map.get_waypoint(tail_far_pt) # Calculate the dot product (Might be unscaled, as only its sign is important) ve_dir = CarlaDataProvider.get_transform(self._actor).get_forward_vector() wp_dir = wp.transform.get_forward_vector() dot_ve_wp = ve_dir.x * wp_dir.x + ve_dir.y * wp_dir.y + ve_dir.z * wp_dir.z # Check the lane until all the "tail" has passed if tail_wp.road_id == wp.road_id and tail_wp.lane_id == wp.lane_id and dot_ve_wp > 0: # This light is red and is affecting our lane yaw_wp = wp.transform.rotation.yaw lane_width = wp.lane_width location_wp = wp.transform.location lft_lane_wp = self.rotate_point(carla.Vector3D(0.4 * lane_width, 0.0, location_wp.z), yaw_wp + 90) lft_lane_wp = location_wp + carla.Location(lft_lane_wp) rgt_lane_wp = self.rotate_point(carla.Vector3D(0.4 * lane_width, 0.0, location_wp.z), yaw_wp - 90) rgt_lane_wp = location_wp + carla.Location(rgt_lane_wp) # Is the vehicle traversing the stop line? if self.is_vehicle_crossing_line((tail_close_pt, tail_far_pt), (lft_lane_wp, rgt_lane_wp)): self.test_status = "FAILURE" self.actual_value += 1 location = traffic_light.get_transform().location red_light_event = TrafficEvent(event_type=TrafficEventType.TRAFFIC_LIGHT_INFRACTION) red_light_event.set_message( "Agent ran a red light {} at (x={}, y={}, z={})".format( traffic_light.id, round(location.x, 3), round(location.y, 3), round(location.z, 3))) red_light_event.set_dict({ 'id': traffic_light.id, 'x': location.x, 'y': location.y, 'z': location.z}) self.list_traffic_events.append(red_light_event) self._last_red_light_id = traffic_light.id break if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status def rotate_point(self, point, angle): """ rotate a given point by a given angle """ x_ = math.cos(math.radians(angle)) * point.x - math.sin(math.radians(angle)) * point.y y_ = math.sin(math.radians(angle)) * point.x + math.cos(math.radians(angle)) * point.y return carla.Vector3D(x_, y_, point.z) def get_traffic_light_waypoints(self, traffic_light): """ get area of a given traffic light """ base_transform = traffic_light.get_transform() base_rot = base_transform.rotation.yaw area_loc = base_transform.transform(traffic_light.trigger_volume.location) # Discretize the trigger box into points area_ext = traffic_light.trigger_volume.extent x_values = np.arange(-0.9 * area_ext.x, 0.9 * area_ext.x, 1.0) # 0.9 to avoid crossing to adjacent lanes area = [] for x in x_values: point = self.rotate_point(carla.Vector3D(x, 0, area_ext.z), base_rot) point_location = area_loc + carla.Location(x=point.x, y=point.y) area.append(point_location) # Get the waypoints of these points, removing duplicates ini_wps = [] for pt in area: wpx = self._map.get_waypoint(pt) # As x_values are arranged in order, only the last one has to be checked if not ini_wps or ini_wps[-1].road_id != wpx.road_id or ini_wps[-1].lane_id != wpx.lane_id: ini_wps.append(wpx) # Advance them until the intersection wps = [] for wpx in ini_wps: while not wpx.is_intersection: next_wp = wpx.next(0.5)[0] if next_wp and not next_wp.is_intersection: wpx = next_wp else: break wps.append(wpx) return area_loc, wps class RunningStopTest(Criterion): """ Check if an actor is running a stop sign Important parameters: - actor: CARLA actor to be used for this test - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ PROXIMITY_THRESHOLD = 50.0 # meters SPEED_THRESHOLD = 0.1 WAYPOINT_STEP = 1.0 # meters def __init__(self, actor, name="RunningStopTest", terminate_on_failure=False): """ """ super(RunningStopTest, self).__init__(name, actor, 0, terminate_on_failure=terminate_on_failure) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) self._actor = actor self._world = CarlaDataProvider.get_world() self._map = CarlaDataProvider.get_map() self._list_stop_signs = [] self._target_stop_sign = None self._stop_completed = False self._affected_by_stop = False self.actual_value = 0 all_actors = self._world.get_actors() for _actor in all_actors: if 'traffic.stop' in _actor.type_id: self._list_stop_signs.append(_actor) @staticmethod def point_inside_boundingbox(point, bb_center, bb_extent): """ X :param point: :param bb_center: :param bb_extent: :return: """ # pylint: disable=invalid-name A = carla.Vector2D(bb_center.x - bb_extent.x, bb_center.y - bb_extent.y) B = carla.Vector2D(bb_center.x + bb_extent.x, bb_center.y - bb_extent.y) D = carla.Vector2D(bb_center.x - bb_extent.x, bb_center.y + bb_extent.y) M = carla.Vector2D(point.x, point.y) AB = B - A AD = D - A AM = M - A am_ab = AM.x * AB.x + AM.y * AB.y ab_ab = AB.x * AB.x + AB.y * AB.y am_ad = AM.x * AD.x + AM.y * AD.y ad_ad = AD.x * AD.x + AD.y * AD.y return am_ab > 0 and am_ab < ab_ab and am_ad > 0 and am_ad < ad_ad # pylint: disable=chained-comparison def is_actor_affected_by_stop(self, actor, stop, multi_step=20): """ Check if the given actor is affected by the stop """ affected = False # first we run a fast coarse test current_location = actor.get_location() stop_location = stop.get_transform().location if stop_location.distance(current_location) > self.PROXIMITY_THRESHOLD: return affected stop_t = stop.get_transform() transformed_tv = stop_t.transform(stop.trigger_volume.location) # slower and accurate test based on waypoint's horizon and geometric test list_locations = [current_location] waypoint = self._map.get_waypoint(current_location) for _ in range(multi_step): if waypoint: next_wps = waypoint.next(self.WAYPOINT_STEP) if not next_wps: break waypoint = next_wps[0] if not waypoint: break list_locations.append(waypoint.transform.location) for actor_location in list_locations: if self.point_inside_boundingbox(actor_location, transformed_tv, stop.trigger_volume.extent): affected = True return affected def _scan_for_stop_sign(self): target_stop_sign = None ve_tra = CarlaDataProvider.get_transform(self._actor) ve_dir = ve_tra.get_forward_vector() wp = self._map.get_waypoint(ve_tra.location) wp_dir = wp.transform.get_forward_vector() dot_ve_wp = ve_dir.x * wp_dir.x + ve_dir.y * wp_dir.y + ve_dir.z * wp_dir.z if dot_ve_wp > 0: # Ignore all when going in a wrong lane for stop_sign in self._list_stop_signs: if self.is_actor_affected_by_stop(self._actor, stop_sign): # this stop sign is affecting the vehicle target_stop_sign = stop_sign break return target_stop_sign def update(self): """ Check if the actor is running a red light """ new_status = py_trees.common.Status.RUNNING location = self._actor.get_location() if location is None: return new_status if not self._target_stop_sign: # scan for stop signs self._target_stop_sign = self._scan_for_stop_sign() else: # we were in the middle of dealing with a stop sign if not self._stop_completed: # did the ego-vehicle stop? current_speed = CarlaDataProvider.get_velocity(self._actor) if current_speed < self.SPEED_THRESHOLD: self._stop_completed = True if not self._affected_by_stop: stop_location = self._target_stop_sign.get_location() stop_extent = self._target_stop_sign.trigger_volume.extent if self.point_inside_boundingbox(location, stop_location, stop_extent): self._affected_by_stop = True if not self.is_actor_affected_by_stop(self._actor, self._target_stop_sign): # is the vehicle out of the influence of this stop sign now? if not self._stop_completed and self._affected_by_stop: # did we stop? self.actual_value += 1 self.test_status = "FAILURE" stop_location = self._target_stop_sign.get_transform().location running_stop_event = TrafficEvent(event_type=TrafficEventType.STOP_INFRACTION) running_stop_event.set_message( "Agent ran a stop with id={} at (x={}, y={}, z={})".format( self._target_stop_sign.id, round(stop_location.x, 3), round(stop_location.y, 3), round(stop_location.z, 3))) running_stop_event.set_dict({ 'id': self._target_stop_sign.id, 'x': stop_location.x, 'y': stop_location.y, 'z': stop_location.z}) self.list_traffic_events.append(running_stop_event) # reset state self._target_stop_sign = None self._stop_completed = False self._affected_by_stop = False if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) return new_status