123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- #!/usr/bin/env python
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
- """
- Module used to parse all the route and scenario configuration parameters.
- """
- import json
- import math
- import xml.etree.ElementTree as ET
- import carla
- from agents.navigation.local_planner import RoadOption
- from srunner.scenarioconfigs.route_scenario_configuration import RouteScenarioConfiguration
- # TODO check this threshold, it could be a bit larger but not so large that we cluster scenarios.
- TRIGGER_THRESHOLD = 2.0 # Threshold to say if a trigger position is new or repeated, works for matching positions
- TRIGGER_ANGLE_THRESHOLD = 10 # Threshold to say if two angles can be considering matching when matching transforms.
- class RouteParser(object):
- """
- Pure static class used to parse all the route and scenario configuration parameters.
- """
- @staticmethod
- def parse_annotations_file(annotation_filename):
- """
- Return the annotations of which positions where the scenarios are going to happen.
- :param annotation_filename: the filename for the anotations file
- :return:
- """
- with open(annotation_filename, 'r', encoding='utf-8') as f:
- annotation_dict = json.loads(f.read())
- final_dict = {}
- for town_dict in annotation_dict['available_scenarios']:
- final_dict.update(town_dict)
- return final_dict # the file has a current maps name that is an one element vec
- @staticmethod
- def parse_routes_file(route_filename, scenario_file, single_route=None):
- """
- Returns a list of route elements.
- :param route_filename: the path to a set of routes.
- :param single_route: If set, only this route shall be returned
- :return: List of dicts containing the waypoints, id and town of the routes
- """
- list_route_descriptions = []
- tree = ET.parse(route_filename)
- for route in tree.iter("route"):
- route_id = route.attrib['id']
- if single_route and route_id != single_route:
- continue
- new_config = RouteScenarioConfiguration()
- new_config.town = route.attrib['town']
- new_config.name = "RouteScenario_{}".format(route_id)
- new_config.weather = RouteParser.parse_weather(route)
- new_config.scenario_file = scenario_file
- waypoint_list = [] # the list of waypoints that can be found on this route
- for waypoint in route.iter('waypoint'):
- waypoint_list.append(carla.Location(x=float(waypoint.attrib['x']),
- y=float(waypoint.attrib['y']),
- z=float(waypoint.attrib['z'])))
- new_config.trajectory = waypoint_list
- list_route_descriptions.append(new_config)
- return list_route_descriptions
- @staticmethod
- def parse_weather(route):
- """
- Returns a carla.WeatherParameters with the corresponding weather for that route. If the route
- has no weather attribute, the default one is triggered.
- """
- route_weather = route.find("weather")
- if route_weather is None:
- weather = carla.WeatherParameters(sun_altitude_angle=70)
- else:
- weather = carla.WeatherParameters()
- for weather_attrib in route.iter("weather"):
- if 'cloudiness' in weather_attrib.attrib:
- weather.cloudiness = float(weather_attrib.attrib['cloudiness'])
- if 'precipitation' in weather_attrib.attrib:
- weather.precipitation = float(weather_attrib.attrib['precipitation'])
- if 'precipitation_deposits' in weather_attrib.attrib:
- weather.precipitation_deposits = float(weather_attrib.attrib['precipitation_deposits'])
- if 'wind_intensity' in weather_attrib.attrib:
- weather.wind_intensity = float(weather_attrib.attrib['wind_intensity'])
- if 'sun_azimuth_angle' in weather_attrib.attrib:
- weather.sun_azimuth_angle = float(weather_attrib.attrib['sun_azimuth_angle'])
- if 'sun_altitude_angle' in weather_attrib.attrib:
- weather.sun_altitude_angle = float(weather_attrib.attrib['sun_altitude_angle'])
- if 'wetness' in weather_attrib.attrib:
- weather.wetness = float(weather_attrib.attrib['wetness'])
- if 'fog_distance' in weather_attrib.attrib:
- weather.fog_distance = float(weather_attrib.attrib['fog_distance'])
- if 'fog_density' in weather_attrib.attrib:
- weather.fog_density = float(weather_attrib.attrib['fog_density'])
- return weather
- @staticmethod
- def check_trigger_position(new_trigger, existing_triggers):
- """
- Check if this trigger position already exists or if it is a new one.
- :param new_trigger:
- :param existing_triggers:
- :return:
- """
- for trigger_id in existing_triggers.keys():
- trigger = existing_triggers[trigger_id]
- dx = trigger['x'] - new_trigger['x']
- dy = trigger['y'] - new_trigger['y']
- distance = math.sqrt(dx * dx + dy * dy)
- dyaw = (trigger['yaw'] - new_trigger['yaw']) % 360
- if distance < TRIGGER_THRESHOLD \
- and (dyaw < TRIGGER_ANGLE_THRESHOLD or dyaw > (360 - TRIGGER_ANGLE_THRESHOLD)):
- return trigger_id
- return None
- @staticmethod
- def convert_waypoint_float(waypoint):
- """
- Convert waypoint values to float
- """
- waypoint['x'] = float(waypoint['x'])
- waypoint['y'] = float(waypoint['y'])
- waypoint['z'] = float(waypoint['z'])
- waypoint['yaw'] = float(waypoint['yaw'])
- @staticmethod
- def match_world_location_to_route(world_location, route_description):
- """
- We match this location to a given route.
- world_location:
- route_description:
- """
- def match_waypoints(waypoint1, wtransform):
- """
- Check if waypoint1 and wtransform are similar
- """
- dx = float(waypoint1['x']) - wtransform.location.x
- dy = float(waypoint1['y']) - wtransform.location.y
- dz = float(waypoint1['z']) - wtransform.location.z
- dpos = math.sqrt(dx * dx + dy * dy + dz * dz)
- dyaw = (float(waypoint1['yaw']) - wtransform.rotation.yaw) % 360
- return dpos < TRIGGER_THRESHOLD \
- and (dyaw < TRIGGER_ANGLE_THRESHOLD or dyaw > (360 - TRIGGER_ANGLE_THRESHOLD))
- match_position = 0
- # TODO this function can be optimized to run on Log(N) time
- for route_waypoint in route_description:
- if match_waypoints(world_location, route_waypoint[0]):
- return match_position
- match_position += 1
- return None
- @staticmethod
- def get_scenario_type(scenario, match_position, trajectory):
- """
- Some scenarios have different types depending on the route.
- :param scenario: the scenario name
- :param match_position: the matching position for the scenarion
- :param trajectory: the route trajectory the ego is following
- :return: tag representing this subtype
- Also used to check which are not viable (Such as an scenario
- that triggers when turning but the route doesnt')
- WARNING: These tags are used at:
- - VehicleTurningRoute
- - SignalJunctionCrossingRoute
- and changes to these tags will affect them
- """
- def check_this_waypoint(tuple_wp_turn):
- """
- Decides whether or not the waypoint will define the scenario behavior
- """
- if RoadOption.LANEFOLLOW == tuple_wp_turn[1]:
- return False
- elif RoadOption.CHANGELANELEFT == tuple_wp_turn[1]:
- return False
- elif RoadOption.CHANGELANERIGHT == tuple_wp_turn[1]:
- return False
- return True
- # Unused tag for the rest of scenarios,
- # can't be None as they are still valid scenarios
- subtype = 'valid'
- if scenario == 'Scenario4':
- for tuple_wp_turn in trajectory[match_position:]:
- if check_this_waypoint(tuple_wp_turn):
- if RoadOption.LEFT == tuple_wp_turn[1]:
- subtype = 'S4left'
- elif RoadOption.RIGHT == tuple_wp_turn[1]:
- subtype = 'S4right'
- else:
- subtype = None
- break # Avoid checking all of them
- subtype = None
- if scenario == 'Scenario7':
- for tuple_wp_turn in trajectory[match_position:]:
- if check_this_waypoint(tuple_wp_turn):
- if RoadOption.LEFT == tuple_wp_turn[1]:
- subtype = 'S7left'
- elif RoadOption.RIGHT == tuple_wp_turn[1]:
- subtype = 'S7right'
- elif RoadOption.STRAIGHT == tuple_wp_turn[1]:
- subtype = 'S7opposite'
- else:
- subtype = None
- break # Avoid checking all of them
- subtype = None
- if scenario == 'Scenario8':
- for tuple_wp_turn in trajectory[match_position:]:
- if check_this_waypoint(tuple_wp_turn):
- if RoadOption.LEFT == tuple_wp_turn[1]:
- subtype = 'S8left'
- else:
- subtype = None
- break # Avoid checking all of them
- subtype = None
- if scenario == 'Scenario9':
- for tuple_wp_turn in trajectory[match_position:]:
- if check_this_waypoint(tuple_wp_turn):
- if RoadOption.RIGHT == tuple_wp_turn[1]:
- subtype = 'S9right'
- else:
- subtype = None
- break # Avoid checking all of them
- subtype = None
- return subtype
- @staticmethod
- def scan_route_for_scenarios(route_name, trajectory, world_annotations):
- """
- Just returns a plain list of possible scenarios that can happen in this route by matching
- the locations from the scenario into the route description
- :return: A list of scenario definitions with their correspondent parameters
- """
- # the triggers dictionaries:
- existent_triggers = {}
- # We have a table of IDs and trigger positions associated
- possible_scenarios = {}
- # Keep track of the trigger ids being added
- latest_trigger_id = 0
- for town_name in world_annotations.keys():
- if town_name != route_name:
- continue
- scenarios = world_annotations[town_name]
- for scenario in scenarios: # For each existent scenario
- if "scenario_type" not in scenario:
- break
- scenario_name = scenario["scenario_type"]
- for event in scenario["available_event_configurations"]:
- waypoint = event['transform'] # trigger point of this scenario
- RouteParser.convert_waypoint_float(waypoint)
- # We match trigger point to the route, now we need to check if the route affects
- match_position = RouteParser.match_world_location_to_route(
- waypoint, trajectory)
- if match_position is not None:
- # We match a location for this scenario, create a scenario object so this scenario
- # can be instantiated later
- if 'other_actors' in event:
- other_vehicles = event['other_actors']
- else:
- other_vehicles = None
- scenario_subtype = RouteParser.get_scenario_type(scenario_name, match_position,
- trajectory)
- if scenario_subtype is None:
- continue
- scenario_description = {
- 'name': scenario_name,
- 'other_actors': other_vehicles,
- 'trigger_position': waypoint,
- 'scenario_type': scenario_subtype, # some scenarios have route dependent configs
- }
- trigger_id = RouteParser.check_trigger_position(waypoint, existent_triggers)
- if trigger_id is None:
- # This trigger does not exist create a new reference on existent triggers
- existent_triggers.update({latest_trigger_id: waypoint})
- # Update a reference for this trigger on the possible scenarios
- possible_scenarios.update({latest_trigger_id: []})
- trigger_id = latest_trigger_id
- # Increment the latest trigger
- latest_trigger_id += 1
- possible_scenarios[trigger_id].append(scenario_description)
- return possible_scenarios, existent_triggers
|