123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647 |
- #!/usr/bin/env python
- # Copyright (c) 2019 Intel Corporation
- #
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
- """
- Summary of useful helper functions for scenarios
- """
- import math
- import shapely.geometry
- import shapely.affinity
- import numpy as np
- import carla
- from agents.tools.misc import vector
- from agents.navigation.local_planner import RoadOption
- from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
- def get_distance_along_route(route, target_location):
- """
- Calculate the distance of the given location along the route
- Note: If the location is not along the route, the route length will be returned
- """
- wmap = CarlaDataProvider.get_map()
- covered_distance = 0
- prev_position = None
- found = False
- # Don't use the input location, use the corresponding wp as location
- target_location_from_wp = wmap.get_waypoint(target_location).transform.location
- for position, _ in route:
- location = target_location_from_wp
- # Don't perform any calculations for the first route point
- if not prev_position:
- prev_position = position
- continue
- # Calculate distance between previous and current route point
- interval_length_squared = ((prev_position.x - position.x) ** 2) + ((prev_position.y - position.y) ** 2)
- distance_squared = ((location.x - prev_position.x) ** 2) + ((location.y - prev_position.y) ** 2)
- # Close to the current position? Stop calculation
- if distance_squared < 0.01:
- break
- if distance_squared < 400 and not distance_squared < interval_length_squared:
- # Check if a neighbor lane is closer to the route
- # Do this only in a close distance to correct route interval, otherwise the computation load is too high
- starting_wp = wmap.get_waypoint(location)
- wp = starting_wp.get_left_lane()
- while wp is not None:
- new_location = wp.transform.location
- new_distance_squared = ((new_location.x - prev_position.x) ** 2) + (
- (new_location.y - prev_position.y) ** 2)
- if np.sign(starting_wp.lane_id) != np.sign(wp.lane_id):
- break
- if new_distance_squared < distance_squared:
- distance_squared = new_distance_squared
- location = new_location
- else:
- break
- wp = wp.get_left_lane()
- wp = starting_wp.get_right_lane()
- while wp is not None:
- new_location = wp.transform.location
- new_distance_squared = ((new_location.x - prev_position.x) ** 2) + (
- (new_location.y - prev_position.y) ** 2)
- if np.sign(starting_wp.lane_id) != np.sign(wp.lane_id):
- break
- if new_distance_squared < distance_squared:
- distance_squared = new_distance_squared
- location = new_location
- else:
- break
- wp = wp.get_right_lane()
- if distance_squared < interval_length_squared:
- # The location could be inside the current route interval, if route/lane ids match
- # Note: This assumes a sufficiently small route interval
- # An alternative is to compare orientations, however, this also does not work for
- # long route intervals
- curr_wp = wmap.get_waypoint(position)
- prev_wp = wmap.get_waypoint(prev_position)
- wp = wmap.get_waypoint(location)
- if prev_wp and curr_wp and wp:
- if wp.road_id in (prev_wp.road_id, curr_wp.road_id):
- # Roads match, now compare the sign of the lane ids
- if (np.sign(wp.lane_id) == np.sign(prev_wp.lane_id) or
- np.sign(wp.lane_id) == np.sign(curr_wp.lane_id)):
- # The location is within the current route interval
- covered_distance += math.sqrt(distance_squared)
- found = True
- break
- covered_distance += math.sqrt(interval_length_squared)
- prev_position = position
- return covered_distance, found
- def get_crossing_point(actor):
- """
- Get the next crossing point location in front of the ego vehicle
- @return point of crossing
- """
- wp_cross = CarlaDataProvider.get_map().get_waypoint(actor.get_location())
- while not wp_cross.is_intersection:
- wp_cross = wp_cross.next(2)[0]
- crossing = carla.Location(x=wp_cross.transform.location.x,
- y=wp_cross.transform.location.y, z=wp_cross.transform.location.z)
- return crossing
- def get_geometric_linear_intersection(ego_actor, other_actor):
- """
- Obtain a intersection point between two actor's location by using their waypoints (wp)
- @return point of intersection of the two vehicles
- """
- wp_ego_1 = CarlaDataProvider.get_map().get_waypoint(ego_actor.get_location())
- wp_ego_2 = wp_ego_1.next(1)[0]
- x_ego_1 = wp_ego_1.transform.location.x
- y_ego_1 = wp_ego_1.transform.location.y
- x_ego_2 = wp_ego_2.transform.location.x
- y_ego_2 = wp_ego_2.transform.location.y
- wp_other_1 = CarlaDataProvider.get_world().get_map().get_waypoint(other_actor.get_location())
- wp_other_2 = wp_other_1.next(1)[0]
- x_other_1 = wp_other_1.transform.location.x
- y_other_1 = wp_other_1.transform.location.y
- x_other_2 = wp_other_2.transform.location.x
- y_other_2 = wp_other_2.transform.location.y
- s = np.vstack([(x_ego_1, y_ego_1), (x_ego_2, y_ego_2), (x_other_1, y_other_1), (x_other_2, y_other_2)])
- h = np.hstack((s, np.ones((4, 1))))
- line1 = np.cross(h[0], h[1])
- line2 = np.cross(h[2], h[3])
- x, y, z = np.cross(line1, line2)
- if z == 0:
- return (float('inf'), float('inf'))
- intersection = carla.Location(x=x / z, y=y / z, z=0)
- return intersection
- def get_location_in_distance(actor, distance):
- """
- Obtain a location in a given distance from the current actor's location.
- Note: Search is stopped on first intersection.
- @return obtained location and the traveled distance
- """
- waypoint = CarlaDataProvider.get_map().get_waypoint(actor.get_location())
- traveled_distance = 0
- while not waypoint.is_intersection and traveled_distance < distance:
- waypoint_new = waypoint.next(1.0)[-1]
- traveled_distance += waypoint_new.transform.location.distance(waypoint.transform.location)
- waypoint = waypoint_new
- return waypoint.transform.location, traveled_distance
- def get_location_in_distance_from_wp(waypoint, distance, stop_at_junction=True):
- """
- Obtain a location in a given distance from the current actor's location.
- Note: Search is stopped on first intersection.
- @return obtained location and the traveled distance
- """
- traveled_distance = 0
- while not (waypoint.is_intersection and stop_at_junction) and traveled_distance < distance:
- wp_next = waypoint.next(1.0)
- if wp_next:
- waypoint_new = wp_next[-1]
- traveled_distance += waypoint_new.transform.location.distance(waypoint.transform.location)
- waypoint = waypoint_new
- else:
- break
- return waypoint.transform.location, traveled_distance
- def get_waypoint_in_distance(waypoint, distance):
- """
- Obtain a waypoint in a given distance from the current actor's location.
- Note: Search is stopped on first intersection.
- @return obtained waypoint and the traveled distance
- """
- traveled_distance = 0
- while not waypoint.is_intersection and traveled_distance < distance:
- waypoint_new = waypoint.next(1.0)[-1]
- traveled_distance += waypoint_new.transform.location.distance(waypoint.transform.location)
- waypoint = waypoint_new
- return waypoint, traveled_distance
- def generate_target_waypoint_list(waypoint, turn=0):
- """
- This method follow waypoints to a junction and choose path based on turn input.
- Turn input: LEFT -> -1, RIGHT -> 1, STRAIGHT -> 0
- @returns a waypoint list from the starting point to the end point according to turn input
- """
- reached_junction = False
- threshold = math.radians(0.1)
- plan = []
- while True:
- wp_choice = waypoint.next(2)
- if len(wp_choice) > 1:
- reached_junction = True
- waypoint = choose_at_junction(waypoint, wp_choice, turn)
- else:
- waypoint = wp_choice[0]
- plan.append((waypoint, RoadOption.LANEFOLLOW))
- # End condition for the behavior
- if turn != 0 and reached_junction and len(plan) >= 3:
- v_1 = vector(
- plan[-2][0].transform.location,
- plan[-1][0].transform.location)
- v_2 = vector(
- plan[-3][0].transform.location,
- plan[-2][0].transform.location)
- angle_wp = math.acos(
- np.dot(v_1, v_2) / abs((np.linalg.norm(v_1) * np.linalg.norm(v_2))))
- if angle_wp < threshold:
- break
- elif reached_junction and not plan[-1][0].is_intersection:
- break
- return plan, plan[-1][0]
- def generate_target_waypoint_list_multilane(waypoint, change='left', # pylint: disable=too-many-return-statements
- distance_same_lane=10, distance_other_lane=25,
- total_lane_change_distance=25, check=True,
- lane_changes=1, step_distance=2):
- """
- This methods generates a waypoint list which leads the vehicle to a parallel lane.
- The change input must be 'left' or 'right', depending on which lane you want to change.
- The default step distance between waypoints on the same lane is 2m.
- The default step distance between the lane change is set to 25m.
- @returns a waypoint list from the starting point to the end point on a right or left parallel lane.
- The function might break before reaching the end point, if the asked behavior is impossible.
- """
- plan = []
- plan.append((waypoint, RoadOption.LANEFOLLOW)) # start position
- option = RoadOption.LANEFOLLOW
- # Same lane
- distance = 0
- while distance < distance_same_lane:
- next_wps = plan[-1][0].next(step_distance)
- if not next_wps:
- return None, None
- next_wp = next_wps[0]
- distance += next_wp.transform.location.distance(plan[-1][0].transform.location)
- plan.append((next_wp, RoadOption.LANEFOLLOW))
- if change == 'left':
- option = RoadOption.CHANGELANELEFT
- elif change == 'right':
- option = RoadOption.CHANGELANERIGHT
- else:
- # ERROR, input value for change must be 'left' or 'right'
- return None, None
- lane_changes_done = 0
- lane_change_distance = total_lane_change_distance / lane_changes
- # Lane change
- while lane_changes_done < lane_changes:
- # Move forward
- next_wps = plan[-1][0].next(lane_change_distance)
- if not next_wps:
- return None, None
- next_wp = next_wps[0]
- # Get the side lane
- if change == 'left':
- if check and str(next_wp.lane_change) not in ['Left', 'Both']:
- return None, None
- side_wp = next_wp.get_left_lane()
- else:
- if check and str(next_wp.lane_change) not in ['Right', 'Both']:
- return None, None
- side_wp = next_wp.get_right_lane()
- if not side_wp or side_wp.lane_type != carla.LaneType.Driving:
- return None, None
- # Update the plan
- plan.append((side_wp, option))
- lane_changes_done += 1
- # Other lane
- distance = 0
- while distance < distance_other_lane:
- next_wps = plan[-1][0].next(step_distance)
- if not next_wps:
- return None, None
- next_wp = next_wps[0]
- distance += next_wp.transform.location.distance(plan[-1][0].transform.location)
- plan.append((next_wp, RoadOption.LANEFOLLOW))
- target_lane_id = plan[-1][0].lane_id
- return plan, target_lane_id
- def generate_target_waypoint(waypoint, turn=0):
- """
- This method follow waypoints to a junction and choose path based on turn input.
- Turn input: LEFT -> -1, RIGHT -> 1, STRAIGHT -> 0
- @returns a waypoint list according to turn input
- """
- sampling_radius = 1
- reached_junction = False
- wp_list = []
- while True:
- wp_choice = waypoint.next(sampling_radius)
- # Choose path at intersection
- if not reached_junction and (len(wp_choice) > 1 or wp_choice[0].is_junction):
- reached_junction = True
- waypoint = choose_at_junction(waypoint, wp_choice, turn)
- else:
- waypoint = wp_choice[0]
- wp_list.append(waypoint)
- # End condition for the behavior
- if reached_junction and not wp_list[-1].is_junction:
- break
- return wp_list[-1]
- def generate_target_waypoint_in_route(waypoint, route):
- """
- This method follow waypoints to a junction
- @returns a waypoint list according to turn input
- """
- wmap = CarlaDataProvider.get_map()
- reached_junction = False
- # Get the route location
- shortest_distance = float('inf')
- for index, route_pos in enumerate(route):
- wp = route_pos[0]
- trigger_location = waypoint.transform.location
- dist_to_route = trigger_location.distance(wp)
- if dist_to_route <= shortest_distance:
- closest_index = index
- shortest_distance = dist_to_route
- route_location = route[closest_index][0]
- index = closest_index
- while True:
- # Get the next route location
- index = min(index + 1, len(route))
- route_location = route[index][0]
- road_option = route[index][1]
- # Enter the junction
- if not reached_junction and (road_option in (RoadOption.LEFT, RoadOption.RIGHT, RoadOption.STRAIGHT)):
- reached_junction = True
- # End condition for the behavior, at the end of the junction
- if reached_junction and (road_option not in (RoadOption.LEFT, RoadOption.RIGHT, RoadOption.STRAIGHT)):
- break
- return wmap.get_waypoint(route_location)
- def choose_at_junction(current_waypoint, next_choices, direction=0):
- """
- This function chooses the appropriate waypoint from next_choices based on direction
- """
- current_transform = current_waypoint.transform
- current_location = current_transform.location
- projected_location = current_location + \
- carla.Location(
- x=math.cos(math.radians(current_transform.rotation.yaw)),
- y=math.sin(math.radians(current_transform.rotation.yaw)))
- current_vector = vector(current_location, projected_location)
- cross_list = []
- cross_to_waypoint = {}
- for waypoint in next_choices:
- waypoint = waypoint.next(10)[0]
- select_vector = vector(current_location, waypoint.transform.location)
- cross = np.cross(current_vector, select_vector)[2]
- cross_list.append(cross)
- cross_to_waypoint[cross] = waypoint
- select_cross = None
- if direction > 0:
- select_cross = max(cross_list)
- elif direction < 0:
- select_cross = min(cross_list)
- else:
- select_cross = min(cross_list, key=abs)
- return cross_to_waypoint[select_cross]
- def get_intersection(ego_actor, other_actor):
- """
- Obtain a intersection point between two actor's location
- @return the intersection location
- """
- waypoint = CarlaDataProvider.get_map().get_waypoint(ego_actor.get_location())
- waypoint_other = CarlaDataProvider.get_map().get_waypoint(other_actor.get_location())
- max_dist = float("inf")
- distance = float("inf")
- while distance <= max_dist:
- max_dist = distance
- current_location = waypoint.transform.location
- waypoint_choice = waypoint.next(1)
- # Select the straighter path at intersection
- if len(waypoint_choice) > 1:
- max_dot = -1 * float('inf')
- loc_projection = current_location + carla.Location(
- x=math.cos(math.radians(waypoint.transform.rotation.yaw)),
- y=math.sin(math.radians(waypoint.transform.rotation.yaw)))
- v_current = vector(current_location, loc_projection)
- for wp_select in waypoint_choice:
- v_select = vector(current_location, wp_select.transform.location)
- dot_select = np.dot(v_current, v_select)
- if dot_select > max_dot:
- max_dot = dot_select
- waypoint = wp_select
- else:
- waypoint = waypoint_choice[0]
- distance = current_location.distance(waypoint_other.transform.location)
- return current_location
- def detect_lane_obstacle(actor, extension_factor=3, margin=1.02):
- """
- This function identifies if an obstacle is present in front of the reference actor
- """
- world = CarlaDataProvider.get_world()
- world_actors = world.get_actors().filter('vehicle.*')
- actor_bbox = actor.bounding_box
- actor_transform = actor.get_transform()
- actor_location = actor_transform.location
- actor_vector = actor_transform.rotation.get_forward_vector()
- actor_vector = np.array([actor_vector.x, actor_vector.y])
- actor_vector = actor_vector / np.linalg.norm(actor_vector)
- actor_vector = actor_vector * (extension_factor - 1) * actor_bbox.extent.x
- actor_location = actor_location + carla.Location(actor_vector[0], actor_vector[1])
- actor_yaw = actor_transform.rotation.yaw
- is_hazard = False
- for adversary in world_actors:
- if adversary.id != actor.id and \
- actor_transform.location.distance(adversary.get_location()) < 50:
- adversary_bbox = adversary.bounding_box
- adversary_transform = adversary.get_transform()
- adversary_loc = adversary_transform.location
- adversary_yaw = adversary_transform.rotation.yaw
- overlap_adversary = RotatedRectangle(
- adversary_loc.x, adversary_loc.y,
- 2 * margin * adversary_bbox.extent.x, 2 * margin * adversary_bbox.extent.y, adversary_yaw)
- overlap_actor = RotatedRectangle(
- actor_location.x, actor_location.y,
- 2 * margin * actor_bbox.extent.x * extension_factor, 2 * margin * actor_bbox.extent.y, actor_yaw)
- overlap_area = overlap_adversary.intersection(overlap_actor).area
- if overlap_area > 0:
- is_hazard = True
- break
- return is_hazard
- def get_offset_transform(transform, offset):
- """
- This function adjusts the give transform by offset and returns the new transform.
- """
- if offset != 0:
- forward_vector = transform.rotation.get_forward_vector()
- orthogonal_vector = carla.Vector3D(x=-forward_vector.y, y=forward_vector.x, z=forward_vector.z)
- transform.location.x = transform.location.x + offset * orthogonal_vector.x
- transform.location.y = transform.location.y + offset * orthogonal_vector.y
- return transform
- def get_troad_from_transform(actor_transform):
- """
- This function finds the lateral road position (t) from actor_transform
- """
- actor_loc = actor_transform.location
- c_wp = CarlaDataProvider.get_map().get_waypoint(actor_loc)
- left_lanes, right_lanes = [], []
- # opendrive standard: (left ==> +ve lane_id) and (right ==> -ve lane_id)
- ref_lane = CarlaDataProvider.get_map().get_waypoint_xodr(c_wp.road_id, 0, c_wp.s)
- for i in range(-50, 50):
- _wp = CarlaDataProvider.get_map().get_waypoint_xodr(c_wp.road_id, i, c_wp.s)
- if _wp:
- if i < 0:
- left_lanes.append(_wp)
- elif i > 0:
- right_lanes.append(_wp)
- if left_lanes:
- left_lane_ids = [ln.lane_id for ln in left_lanes]
- lm_id = min(left_lane_ids)
- lm_lane = left_lanes[left_lane_ids.index(lm_id)]
- lm_lane_offset = lm_lane.lane_width / 2
- else:
- lm_lane, lm_lane_offset = ref_lane, 0
- lm_tr = get_offset_transform(carla.Transform(lm_lane.transform.location, lm_lane.transform.rotation),
- lm_lane_offset)
- distance_from_lm_lane_edge = lm_tr.location.distance(actor_loc)
- distance_from_lm_lane_ref_lane = lm_tr.location.distance(ref_lane.transform.location)
- if right_lanes:
- right_lane_ids = [ln.lane_id for ln in right_lanes]
- rm_id = max(right_lane_ids)
- rm_lane = right_lanes[right_lane_ids.index(rm_id)]
- rm_lane_offset = -rm_lane.lane_width / 2
- else:
- rm_lane, rm_lane_offset = ref_lane, -distance_from_lm_lane_ref_lane
- distance_from_rm_lane_edge = get_offset_transform(carla.Transform(rm_lane.transform.location,
- rm_lane.transform.rotation),
- rm_lane_offset).location.distance(actor_loc)
- t_road = ref_lane.transform.location.distance(actor_loc)
- if right_lanes == [] or left_lanes == []:
- closest_road_edge = min(distance_from_lm_lane_edge, distance_from_rm_lane_edge)
- if closest_road_edge == distance_from_lm_lane_edge:
- t_road = -1*t_road
- else:
- if c_wp.lane_id < 0:
- t_road = -1*t_road
- return t_road
- def get_distance_between_actors(current, target, distance_type="euclidianDistance", freespace=False,
- global_planner=None):
- """
- This function finds the distance between actors for different use cases described by distance_type and freespace
- attributes
- """
- target_transform = CarlaDataProvider.get_transform(target)
- current_transform = CarlaDataProvider.get_transform(current)
- target_wp = CarlaDataProvider.get_map().get_waypoint(target_transform.location)
- current_wp = CarlaDataProvider.get_map().get_waypoint(current_transform.location)
- extent_sum_x, extent_sum_y = 0, 0
- if freespace:
- if isinstance(target, (carla.Vehicle, carla.Walker)):
- extent_sum_x = target.bounding_box.extent.x + current.bounding_box.extent.x
- extent_sum_y = target.bounding_box.extent.y + current.bounding_box.extent.y
- if distance_type == "longitudinal":
- if not current_wp.road_id == target_wp.road_id:
- distance = 0
- # Get the route
- route = global_planner.trace_route(current_transform.location, target_transform.location)
- # Get the distance of the route
- for i in range(1, len(route)):
- curr_loc = route[i][0].transform.location
- prev_loc = route[i - 1][0].transform.location
- distance += curr_loc.distance(prev_loc)
- else:
- distance = abs(current_wp.s - target_wp.s)
- if freespace:
- distance = distance - extent_sum_x
- elif distance_type == "lateral":
- target_t = get_troad_from_transform(target_transform)
- current_t = get_troad_from_transform(current_transform)
- distance = abs(target_t - current_t)
- if freespace:
- distance = distance - extent_sum_y
- elif distance_type in ["cartesianDistance", "euclidianDistance"]:
- distance = target_transform.location.distance(current_transform.location)
- if freespace:
- distance = distance - extent_sum_x
- else:
- raise TypeError("unknown distance_type: {}".format(distance_type))
- # distance will be negative for feeespace when there is overlap condition
- # truncate to 0.0 when this happens
- distance = 0.0 if distance < 0.0 else distance
- return distance
- class RotatedRectangle(object):
- """
- This class contains method to draw rectangle and find intersection point.
- """
- def __init__(self, c_x, c_y, width, height, angle):
- self.c_x = c_x
- self.c_y = c_y
- self.w = width # pylint: disable=invalid-name
- self.h = height # pylint: disable=invalid-name
- self.angle = angle
- def get_contour(self):
- """
- create contour
- """
- w = self.w
- h = self.h
- c = shapely.geometry.box(-w / 2.0, -h / 2.0, w / 2.0, h / 2.0)
- rc = shapely.affinity.rotate(c, self.angle)
- return shapely.affinity.translate(rc, self.c_x, self.c_y)
- def intersection(self, other):
- """
- Obtain a intersection point between two contour.
- """
- return self.get_contour().intersection(other.get_contour())
|