12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439 |
- #!/usr/bin/env python
- # Copyright (c) 2019-2021 Intel Corporation
- #
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
- """
- This module provides a parser for scenario configuration files based on OpenSCENARIO
- """
- from __future__ import print_function
- import random
- from distutils.util import strtobool
- import re
- import copy
- import datetime
- import math
- import operator
- import random
- import py_trees
- import carla
- from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
- from srunner.scenariomanager.weather_sim import Weather
- from srunner.scenariomanager.scenarioatomics.atomic_behaviors import (TrafficLightStateSetter,
- ActorTransformSetterToOSCPosition,
- RunScript,
- ChangeWeather,
- ChangeAutoPilot,
- ChangeRoadFriction,
- ChangeActorTargetSpeed,
- ChangeActorControl,
- ChangeActorWaypoints,
- ChangeActorLateralMotion,
- ChangeActorLaneOffset,
- SyncArrivalOSC,
- KeepLongitudinalGap,
- Idle,
- ChangeParameter)
- # pylint: disable=unused-import
- # For the following includes the pylint check is disabled, as these are accessed via globals()
- from srunner.scenariomanager.scenarioatomics.atomic_criteria import (CollisionTest,
- MaxVelocityTest,
- DrivenDistanceTest,
- AverageVelocityTest,
- KeepLaneTest,
- ReachedRegionTest,
- OnSidewalkTest,
- WrongLaneTest,
- InRadiusRegionTest,
- InRouteTest,
- RouteCompletionTest,
- RunningRedLightTest,
- RunningStopTest,
- OffRoadTest,
- EndofRoadTest)
- # pylint: enable=unused-import
- from srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions import (InTriggerDistanceToVehicle,
- InTriggerDistanceToOSCPosition,
- InTimeToArrivalToOSCPosition,
- InTimeToArrivalToVehicle,
- DriveDistance,
- StandStill,
- OSCStartEndCondition,
- TriggerAcceleration,
- RelativeVelocityToOtherActor,
- TimeOfDayComparison,
- TriggerVelocity,
- WaitForTrafficLightState,
- CheckParameter)
- from srunner.scenariomanager.timer import TimeOut, SimulationTimeCondition
- from srunner.tools.py_trees_port import oneshot_behavior
- from srunner.tools.scenario_helper import get_offset_transform, get_troad_from_transform
- def oneshot_with_check(variable_name, behaviour, name=None):
- """
- Check if the blackboard contains already variable_name and
- return a oneshot_behavior for behaviour.
- """
- blackboard = py_trees.blackboard.Blackboard()
- # check if the variable_name already exists in the blackboard
- if blackboard.get(variable_name) is not None:
- print("Warning: {} is already used before. Check your XOSC for duplicated names".format(variable_name))
- return oneshot_behavior(variable_name, behaviour, name)
- class ParameterRef:
- """
- This class stores osc parameter reference in its original form.
- Returns the converted value whenever it is used.
- """
- def __init__(self, reference_text) -> None:
- # TODO: (for OSC1.1) add methods(lexer and math_interpreter) to
- # recognize and interpret math expression from reference_text
- self.reference_text = str(reference_text)
- def is_literal(self) -> bool:
- """
- Returns: True when text is a literal/number
- """
- return self._is_matching(pattern=r"(-)?\d+(\.\d*)?")
- def is_parameter(self) -> bool:
- """
- Returns: True when text is a parameter
- """
- return self._is_matching(pattern=r"[$][A-Za-z_][\w]*")
- def _is_matching(self, pattern: str) -> bool:
- """
- Returns: True when pattern is matching with text
- """
- match = re.search(pattern, self.reference_text)
- if match is not None:
- matching_string = match.group()
- return matching_string == self.reference_text
- return False
- def get_interpreted_value(self):
- """
- Returns: interpreted value from reference_text
- """
- if self.is_literal():
- value = self.reference_text
- elif self.is_parameter():
- value = CarlaDataProvider.get_osc_global_param_value(self.reference_text)
- if value is None:
- raise Exception("Parameter '{}' is not defined".format(self.reference_text[1:]))
- else:
- value = None
- return value
- def __float__(self) -> float:
- value = self.get_interpreted_value()
- if value is not None:
- return float(value)
- else:
- raise Exception("could not convert '{}' to float".format(self.reference_text))
- def __int__(self) -> int:
- value = self.get_interpreted_value()
- if value is not None:
- return int(float(value))
- else:
- raise Exception("could not convert '{}' to int".format(self.reference_text))
- def __str__(self) -> str:
- value = self.get_interpreted_value()
- return str(value) if value is not None else self.reference_text
- def __repr__(self):
- value = self.get_interpreted_value()
- return value if value is not None else self.reference_text
- def __radd__(self, other) -> bool:
- return other + self.__float__()
- def __add__(self, other) -> bool:
- return other + self.__float__()
- def __rsub__(self, other) -> bool:
- return other - self.__float__()
- def __sub__(self, other) -> bool:
- return self.__float__() - other
- def __rmul__(self, other) -> bool:
- return other * self.__float__()
- def __mul__(self, other) -> bool:
- return other * self.__float__()
- def __truediv__(self, other) -> bool:
- return self.__float__() / other
- def __rtruediv__(self, other) -> bool:
- return other / self.__float__()
- def __eq__(self, other) -> bool:
- return other == self.__float__()
- def __ne__(self, other) -> bool:
- return other != self.__float__()
- def __ge__(self, other) -> bool:
- return self.__float__() >= other
- def __le__(self, other) -> bool:
- return self.__float__() <= other
- def __gt__(self, other) -> bool:
- return self.__float__() > other
- def __lt__(self, other) -> bool:
- return self.__float__() < other
- def __GE__(self, other) -> bool: # pylint: disable=invalid-name
- return self.__float__() >= other
- def __LE__(self, other) -> bool: # pylint: disable=invalid-name
- return self.__float__() <= other
- def __GT__(self, other) -> bool: # pylint: disable=invalid-name
- return self.__float__() > other
- def __LT__(self, other) -> bool: # pylint: disable=invalid-name
- return self.__float__() < other
- def __iadd__(self, other) -> bool:
- return self.__float__() + other
- def __isub__(self, other) -> bool:
- return self.__float__() - other
- def __abs__(self):
- return abs(self.__float__())
- class OpenScenarioParser(object):
- """
- Pure static class providing conversions from OpenSCENARIO elements to ScenarioRunner elements
- """
- operators = {
- "greaterThan": operator.gt,
- "lessThan": operator.lt,
- "equalTo": operator.eq,
- "greaterOrEqual": operator.ge,
- "lessOrEqual": operator.le,
- "notEqualTo": operator.ne
- }
- actor_types = {
- "pedestrian": "walker",
- "vehicle": "vehicle",
- "miscellaneous": "miscellaneous"
- }
- tl_states = {
- "GREEN": carla.TrafficLightState.Green,
- "YELLOW": carla.TrafficLightState.Yellow,
- "RED": carla.TrafficLightState.Red,
- "OFF": carla.TrafficLightState.Off,
- }
- global_osc_parameters = {}
- use_carla_coordinate_system = False
- osc_filepath = None
- @staticmethod
- def get_traffic_light_from_osc_name(name):
- """
- Returns a carla.TrafficLight instance that matches the name given
- """
- traffic_light = None
- # Given by id
- if name.startswith("id="):
- tl_id = int(name[3:])
- for carla_tl in CarlaDataProvider.get_world().get_actors().filter('traffic.traffic_light'):
- if carla_tl.id == tl_id:
- traffic_light = carla_tl
- break
- # Given by position
- elif name.startswith("pos="):
- tl_pos = name[4:]
- pos = tl_pos.split(",")
- for carla_tl in CarlaDataProvider.get_world().get_actors().filter('traffic.traffic_light'):
- carla_tl_location = carla_tl.get_transform().location
- distance = carla_tl_location.distance(carla.Location(float(pos[0]),
- float(pos[1]),
- carla_tl_location.z))
- if distance < 2.0:
- traffic_light = carla_tl
- break
- if traffic_light is None:
- raise AttributeError("Unknown traffic light {}".format(name))
- return traffic_light
- @staticmethod
- def set_osc_filepath(filepath):
- """
- Set path of OSC file. This is required if for example custom commands are provided with
- relative paths.
- """
- OpenScenarioParser.osc_filepath = filepath
- @staticmethod
- def set_use_carla_coordinate_system():
- """
- CARLA internally uses a left-hand coordinate system (Unreal), but OpenSCENARIO and OpenDRIVE
- are intended for right-hand coordinate system. Hence, we need to invert the coordinates, if
- the scenario does not use CARLA coordinates, but instead right-hand coordinates.
- """
- OpenScenarioParser.use_carla_coordinate_system = True
- @staticmethod
- def set_parameters(xml_tree, additional_parameter_dict=None):
- """
- Parse the xml_tree, and replace all parameter references
- with the actual values.
- Note: Parameter names must not start with "$", however when referencing a parameter the
- reference has to start with "$".
- https://releases.asam.net/OpenSCENARIO/1.0.0/ASAM_OpenSCENARIO_BS-1-2_User-Guide_V1-0-0.html#_re_use_mechanisms
- Args:
- xml_tree: Containing all nodes that should be updated
- additional_parameter_dict (dictionary): Additional parameters as dict (key, value). Optional.
- returns:
- updated xml_tree, dictonary containing all parameters and their values
- """
- parameter_dict = {}
- parameters = xml_tree.find('ParameterDeclarations')
- if parameters is None and not parameter_dict:
- return xml_tree, parameter_dict
- if parameters is None:
- parameters = []
- for parameter in parameters:
- name = parameter.attrib.get('name')
- value = parameter.attrib.get('value')
- parameter_dict[name] = value
- # overwrite parameters in parameters_dict by additional_parameters_dict
- if additional_parameter_dict is not None:
- parameter_dict = dict(list(parameter_dict.items()) + list(additional_parameter_dict.items()))
- return xml_tree, parameter_dict
- @staticmethod
- def set_global_parameters(parameter_dict):
- """
- Set global_osc_parameter dictionary
- Args:
- parameter_dict (Dictionary): Input for global_osc_parameter
- """
- OpenScenarioParser.global_osc_parameters = parameter_dict
- CarlaDataProvider.update_osc_global_params(parameter_dict)
- @staticmethod
- def get_catalog_entry(catalogs, catalog_reference):
- """
- Get catalog entry referenced by catalog_reference included correct parameter settings
- Args:
- catalogs (Dictionary of dictionaries): List of all catalogs and their entries
- catalog_reference (XML ElementTree): Reference containing the exact catalog to be used
- returns:
- Catalog entry (XML ElementTree)
- """
- entry_name = str(ParameterRef(catalog_reference.attrib.get("entryName")))
- entry = catalogs[catalog_reference.attrib.get("catalogName")][entry_name]
- entry_copy = copy.deepcopy(entry)
- catalog_copy = copy.deepcopy(catalog_reference)
- entry = OpenScenarioParser.assign_catalog_parameters(entry_copy, catalog_copy)
- return entry
- @staticmethod
- def assign_catalog_parameters(entry_instance, catalog_reference):
- """
- Parse catalog_reference, and replace all parameter references
- in entry_instance by the values provided in catalog_reference.
- Not to be used from outside this class.
- Args:
- entry_instance (XML ElementTree): Entry to be updated
- catalog_reference (XML ElementTree): Reference containing the exact parameter values
- returns:
- updated entry_instance with updated parameter values
- """
- parameter_dict = {}
- for elem in entry_instance.iter():
- if elem.find('ParameterDeclarations') is not None:
- parameters = elem.find('ParameterDeclarations')
- for parameter in parameters:
- name = parameter.attrib.get('name')
- value = parameter.attrib.get('value')
- parameter_dict[name] = value
- for parameter_assignments in catalog_reference.iter("ParameterAssignments"):
- for parameter_assignment in parameter_assignments.iter("ParameterAssignment"):
- parameter = parameter_assignment.attrib.get("parameterRef")
- value = parameter_assignment.attrib.get("value")
- parameter_dict[parameter] = value
- for node in entry_instance.iter():
- for key in node.attrib:
- for param in sorted(parameter_dict, key=len, reverse=True):
- if "$" + param in node.attrib[key]:
- node.attrib[key] = node.attrib[key].replace("$" + param, parameter_dict[param])
- OpenScenarioParser.set_parameters(entry_instance, OpenScenarioParser.global_osc_parameters)
- return entry_instance
- @staticmethod
- def get_friction_from_env_action(xml_tree, catalogs):
- """
- Extract the CARLA road friction coefficient from an OSC EnvironmentAction
- Args:
- xml_tree: Containing the EnvironmentAction,
- or the reference to the catalog it is defined in.
- catalogs: XML Catalogs that could contain the EnvironmentAction
- returns:
- friction (float)
- """
- if xml_tree.findall('.//EnvironmentAction'):
- node = xml_tree.findall('.//EnvironmentAction')[0]
- set_environment = next(node.iter("EnvironmentAction"))
- else:
- return 1.0
- if sum(1 for _ in set_environment.iter("Weather")) != 0:
- environment = set_environment.find("Environment")
- elif set_environment.find("CatalogReference") is not None:
- catalog_reference = set_environment.find("CatalogReference")
- environment = OpenScenarioParser.get_catalog_entry(catalogs, catalog_reference)
- friction = 1.0
- road_condition = environment.iter("RoadCondition")
- for condition in road_condition:
- friction = condition.attrib.get('frictionScaleFactor')
- return friction
- @staticmethod
- def get_weather_from_env_action(xml_tree, catalogs):
- """
- Extract the CARLA weather parameters from an OSC EnvironmentAction
- Args:
- xml_tree: Containing the EnvironmentAction,
- or the reference to the catalog it is defined in.
- catalogs: XML Catalogs that could contain the EnvironmentAction
- returns:
- Weather (srunner.scenariomanager.weather_sim.Weather)
- """
- if xml_tree.findall('.//EnvironmentAction'):
- node = xml_tree.findall('.//EnvironmentAction')[0]
- set_environment = next(node.iter("EnvironmentAction"))
- else:
- return Weather(carla.WeatherParameters())
- if sum(1 for _ in set_environment.iter("Weather")) != 0:
- environment = set_environment.find("Environment")
- elif set_environment.find("CatalogReference") is not None:
- catalog_reference = set_environment.find("CatalogReference")
- environment = OpenScenarioParser.get_catalog_entry(catalogs, catalog_reference)
- weather = environment.find("Weather")
- sun = weather.find("Sun")
- carla_weather = carla.WeatherParameters()
- carla_weather.sun_azimuth_angle = math.degrees(float(sun.attrib.get('azimuth', 0)))
- carla_weather.sun_altitude_angle = math.degrees(float(sun.attrib.get('elevation', 0)))
- carla_weather.cloudiness = 100 - float(sun.attrib.get('intensity', 0)) * 100
- fog = weather.find("Fog")
- carla_weather.fog_distance = float(fog.attrib.get('visualRange', 'inf'))
- if carla_weather.fog_distance >=20:
- # carla_weather.fog_density=25-0.125*(float(carla_weather.fog_distance))
- carla_weather.fog_density=(200-float(carla_weather.fog_distance))/9
- else:
- # carla_weather.fog_density=100-2*(float(carla_weather.fog_distance))
- carla_weather.fog_density=100-4*(float(carla_weather.fog_distance))
- print(carla_weather.fog_distance,carla_weather.fog_density)
- carla_weather.precipitation = 0
- carla_weather.precipitation_deposits = 0
- carla_weather.wetness = 0
- carla_weather.wind_intensity = 30.0
- precepitation = weather.find("Precipitation")
- if precepitation.attrib.get('precipitationType') == "rain":
- precipitation = float(precepitation.attrib.get('intensity')) * 100
- if precipitation<=60:
- carla_weather.precipitation=precipitation*0.5
- else:
- # carla_weather.precipitation=precipitation*(4/3.0)-(100/3.0)
- carla_weather.precipitation=precipitation*(7/4.0)-75
- carla_weather.precipitation_deposits = carla_weather.precipitation # if it rains, make the road wet
- print(carla_weather.precipitation_deposits)
- #TODO
- carla_weather.wetness = carla_weather.precipitation
- elif precepitation.attrib.get('type') == "snow":
- raise AttributeError("CARLA does not support snow precipitation")
- time_of_day = environment.find("TimeOfDay")
- weather_animation = strtobool(time_of_day.attrib.get("animation"))
- time = time_of_day.attrib.get("dateTime")
- dtime = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%S")
- return Weather(carla_weather, dtime, weather_animation)
- @staticmethod
- def get_controller(xml_tree, catalogs):
- """
- Extract the object controller from the OSC XML or a catalog
- Args:
- xml_tree: Containing the controller information,
- or the reference to the catalog it is defined in.
- catalogs: XML Catalogs that could contain the EnvironmentAction
- returns:
- module: Python module containing the controller implementation
- args: Dictonary with (key, value) parameters for the controller
- """
- assign_action = next(xml_tree.iter("AssignControllerAction"))
- properties = None
- if assign_action.find('Controller') is not None:
- properties = assign_action.find('Controller').find('Properties')
- elif assign_action.find("CatalogReference") is not None:
- catalog_reference = assign_action.find("CatalogReference")
- properties = OpenScenarioParser.get_catalog_entry(catalogs, catalog_reference).find('Properties')
- module = None
- args = {}
- for prop in properties:
- if prop.attrib.get('name') == "module":
- module = prop.attrib.get('value')
- else:
- args[prop.attrib.get('name')] = prop.attrib.get('value')
- override_action = xml_tree.find('OverrideControllerValueAction')
- for child in override_action:
- if strtobool(child.attrib.get('active')):
- raise NotImplementedError("Controller override actions are not yet supported")
- return module, args
- @staticmethod
- def get_route(xml_tree, catalogs):
- """
- Extract the route from the OSC XML or a catalog
- Args:
- xml_tree: Containing the route information,
- or the reference to the catalog it is defined in.
- catalogs: XML Catalogs that could contain the Route
- returns:
- waypoints: List of route waypoints = (waypoint, routing strategy)
- where the strategy is a string indicating if the fastest/shortest/etc.
- route should be used.
- """
- route = None
- if xml_tree.find('Route') is not None:
- route = xml_tree.find('Route')
- elif xml_tree.find('CatalogReference') is not None:
- catalog_reference = xml_tree.find("CatalogReference")
- route = OpenScenarioParser.get_catalog_entry(catalogs, catalog_reference)
- else:
- raise AttributeError("Unknown private FollowRoute action")
- waypoints = []
- if route is not None:
- for waypoint in route.iter('Waypoint'):
- position = waypoint.find('Position')
- routing_option = str(waypoint.attrib.get('routeStrategy'))
- waypoints.append((position, routing_option))
- else:
- raise AttributeError("No waypoints has been set")
- return waypoints
- @staticmethod
- def get_trajectory(xml_tree, catalogs):
- """
- Extract the trajectory from the OSC XML or a catalog
- Args:
- xml_tree: Containing the trajectory information,
- or the reference to the catalog it is defined in.
- catalogs: XML Catalogs that could contain the trajectory
- returns:
- waypoints: List of trajectory waypoints and times.
- Only polylines are supported
- """
- trajectory = None
- waypoints = []
- times = []
- if xml_tree.find('Trajectory') is not None:
- trajectory = xml_tree.find('Trajectory')
- elif xml_tree.find('CatalogReference') is not None:
- catalog_reference = xml_tree.find("CatalogReference")
- trajectory = OpenScenarioParser.get_catalog_entry(catalogs, catalog_reference)
- else:
- raise AttributeError("Unknown private FollowTrajectory action")
- if trajectory is not None:
- shape = trajectory.find('Shape')
- if shape.find('Polyline') is not None:
- line = shape.find('Polyline')
- for vertex in line.iter('Vertex'):
- times.append(float(vertex.get('time')))
- waypoints.append(vertex.find('Position'))
- elif shape.find('Clothoid') is not None:
- raise AttributeError("Clothoid shapes are currently unsupported")
- elif shape.find('Nurbs') is not None:
- raise AttributeError("Nurbs shapes are currently unsupported")
- else:
- raise AttributeError("Requested shape {} isn't a valid shape".format(shape))
- else:
- raise AttributeError("No waypoints has been set")
- return waypoints, times
- @staticmethod
- def convert_position_to_transform(position, actor_list=None):
- """
- Convert an OpenScenario position into a CARLA transform
- Not supported: RoutePosition
- """
- if position.find('WorldPosition') is not None:
- world_pos = position.find('WorldPosition')
- x = float(ParameterRef(world_pos.attrib.get('x', 0)))
- y = float(ParameterRef(world_pos.attrib.get('y', 0)))
- z = float(ParameterRef(world_pos.attrib.get('z', 0)))
- yaw = math.degrees(float(ParameterRef(world_pos.attrib.get('h', 0))))
- pitch = math.degrees(float(ParameterRef(world_pos.attrib.get('p', 0))))
- roll = math.degrees(float(ParameterRef(world_pos.attrib.get('r', 0))))
- if not OpenScenarioParser.use_carla_coordinate_system:
- y = y * (-1.0)
- yaw = yaw * (-1.0)
- return carla.Transform(carla.Location(x=x, y=y, z=z), carla.Rotation(yaw=yaw, pitch=pitch, roll=roll))
- elif ((position.find('RelativeWorldPosition') is not None) or
- (position.find('RelativeObjectPosition') is not None) or
- (position.find('RelativeLanePosition') is not None) or
- (position.find('RelativeRoadPosition') is not None)):
- if position.find('RelativeWorldPosition') is not None:
- rel_pos = position.find('RelativeWorldPosition')
- if position.find('RelativeObjectPosition') is not None:
- rel_pos = position.find('RelativeObjectPosition')
- if position.find('RelativeLanePosition') is not None:
- rel_pos = position.find('RelativeLanePosition')
- if position.find('RelativeRoadPosition') is not None:
- rel_pos = position.find('RelativeRoadPosition')
- # get relative object and relative position
- obj = rel_pos.attrib.get('entityRef')
- obj_actor = None
- actor_transform = None
- if actor_list is not None:
- for actor in actor_list:
- if actor.rolename == obj:
- obj_actor = actor
- actor_transform = actor.transform
- else:
- for actor in CarlaDataProvider.get_world().get_actors():
- if 'role_name' in actor.attributes and actor.attributes['role_name'] == obj:
- obj_actor = actor
- actor_transform = obj_actor.get_transform()
- break
- if obj_actor is None or actor_transform is None:
- raise AttributeError("Object '{}' provided as position reference is not known".format(obj))
- # calculate orientation h, p, r
- is_absolute = False
- dyaw = 0
- dpitch = 0
- droll = 0
- if rel_pos.find('Orientation') is not None:
- orientation = rel_pos.find('Orientation')
- is_absolute = (orientation.attrib.get('type') == "absolute")
- dyaw = math.degrees(float(ParameterRef(orientation.attrib.get('h', 0))))
- dpitch = math.degrees(float(ParameterRef(orientation.attrib.get('p', 0))))
- droll = math.degrees(float(ParameterRef(orientation.attrib.get('r', 0))))
- if not OpenScenarioParser.use_carla_coordinate_system:
- dyaw = dyaw * (-1.0)
- yaw = actor_transform.rotation.yaw
- pitch = actor_transform.rotation.pitch
- roll = actor_transform.rotation.roll
- if not is_absolute:
- yaw = yaw + dyaw
- pitch = pitch + dpitch
- roll = roll + droll
- else:
- yaw = dyaw
- pitch = dpitch
- roll = droll
- # calculate location x, y, z
- # dx, dy, dz
- if ((position.find('RelativeWorldPosition') is not None) or
- (position.find('RelativeObjectPosition') is not None)):
- dx = float(ParameterRef(rel_pos.attrib.get('dx', 0)))
- dy = float(ParameterRef(rel_pos.attrib.get('dy', 0)))
- dz = float(ParameterRef(rel_pos.attrib.get('dz', 0)))
- if not OpenScenarioParser.use_carla_coordinate_system:
- dy = dy * (-1.0)
- x = actor_transform.location.x + dx
- y = actor_transform.location.y + dy
- z = actor_transform.location.z + dz
- transform = carla.Transform(carla.Location(x=x, y=y, z=z),
- carla.Rotation(yaw=yaw, pitch=pitch, roll=roll))
- elif position.find('RelativeLanePosition') is not None:
- dlane = float(ParameterRef(rel_pos.attrib.get('dLane')))
- ds = float(ParameterRef(rel_pos.attrib.get('ds')))
- offset = float(ParameterRef(rel_pos.attrib.get('offset', 0.0)))
- carla_map = CarlaDataProvider.get_map()
- relative_waypoint = carla_map.get_waypoint(actor_transform.location)
- road_id, ref_lane_id, ref_s = relative_waypoint.road_id, relative_waypoint.lane_id, relative_waypoint.s
- target_lane_id = int(ref_lane_id + dlane)
- waypoint = CarlaDataProvider.get_map().get_waypoint_xodr(road_id, target_lane_id, ref_s)
- if waypoint is not None:
- if ds < 0:
- ds = (-1.0) * ds
- waypoint = waypoint.previous(ds)[-1]
- else:
- waypoint = waypoint.next(ds)[-1]
- if waypoint is None:
- raise AttributeError("RelativeLanePosition " +
- "'roadId={} with ds={} and lane_id={}' does not exist".format(road_id,
- ds,
- target_lane_id))
- transform = waypoint.transform
- transform.rotation.yaw = yaw
- transform.rotation.pitch = pitch
- transform.rotation.roll = roll
- # Adapt transform according to offset
- if abs(offset) == waypoint.lane_width:
- # if the offset position is exactly on lane edge its difficult to find out which lane its on.
- # so, when offset is on lane edge/corner adjust it to stay inside the lane
- # if user wants to offset out of lane then make sure offset > waypoint.lane_width
- if offset > 0:
- offset = offset - 0.05
- elif offset < 0:
- offset = offset + 0.05
- transform = get_offset_transform(transform, offset)
- elif position.find('RelativeRoadPosition') is not None:
- ds = float(ParameterRef(rel_pos.attrib.get('ds')))
- dt = float(ParameterRef(rel_pos.attrib.get('dt', 0.0)))
- troad = get_troad_from_transform
- carla_map = CarlaDataProvider.get_map()
- relative_waypoint = carla_map.get_waypoint(actor_transform.location)
- road_id, ref_lane_id, ref_s = relative_waypoint.road_id, relative_waypoint.lane_id, relative_waypoint.s
- target_t, target_s = troad(relative_waypoint.transform) - troad(actor_transform) + dt, ref_s + ds
- waypoint = CarlaDataProvider.get_map().get_waypoint_xodr(road_id, ref_lane_id, target_s)
- if waypoint is None:
- raise AttributeError("RelativeRoadPosition 'roadId={} with s={} and t={}' does not exist".format(
- road_id, target_s, target_t))
- transform = waypoint.transform
- transform.rotation.yaw = yaw
- transform.rotation.pitch = pitch
- transform.rotation.roll = roll
- transform = get_offset_transform(transform, target_t)
- return transform
- elif position.find('RoadPosition') is not None:
- road_pos = position.find('RoadPosition')
- road_id = int(ParameterRef(road_pos.attrib.get('roadId', 0)))
- t = float(ParameterRef(road_pos.attrib.get('t', 0)))
- s = float(ParameterRef(road_pos.attrib.get('s', 0)))
- waypoint = CarlaDataProvider.get_map().get_waypoint_xodr(road_id, 0, s)
- if waypoint is None:
- raise AttributeError("RoadPosition 'roadId={} with s={} and t={}' does not exist".format(road_id, s, t))
- transform = waypoint.transform
- if road_pos.find('Orientation') is not None:
- orientation = road_pos.find('Orientation')
- dyaw = math.degrees(float(ParameterRef(orientation.attrib.get('h', 0))))
- dpitch = math.degrees(float(ParameterRef(orientation.attrib.get('p', 0))))
- droll = math.degrees(float(ParameterRef(orientation.attrib.get('r', 0))))
- if not OpenScenarioParser.use_carla_coordinate_system:
- dyaw = dyaw * (-1.0)
- transform.rotation.yaw = transform.rotation.yaw + dyaw
- transform.rotation.pitch = transform.rotation.pitch + dpitch
- transform.rotation.roll = transform.rotation.roll + droll
- if not OpenScenarioParser.use_carla_coordinate_system:
- # multiply -1 because unlike offset t road is -ve for right side
- t = -1*t
- transform = get_offset_transform(transform, t)
- return transform
- elif position.find('LanePosition') is not None:
- lane_pos = position.find('LanePosition')
- road_id = int(ParameterRef(lane_pos.attrib.get('roadId', 0)))
- lane_id = int(ParameterRef(lane_pos.attrib.get('laneId', 0)))
- offset = float(ParameterRef(lane_pos.attrib.get('offset', 0)))
- s = float(ParameterRef(lane_pos.attrib.get('s', 0)))
- waypoint = CarlaDataProvider.get_map().get_waypoint_xodr(road_id, lane_id, s)
- if waypoint is None:
- raise AttributeError("LanePosition 'roadId={}, laneId={}, s={}, offset={}' does not exist".format(
- road_id, lane_id, s, offset))
- transform = waypoint.transform
- if lane_pos.find('Orientation') is not None:
- orientation = lane_pos.find('Orientation')
- dyaw = math.degrees(float(ParameterRef(orientation.attrib.get('h', 0))))
- dpitch = math.degrees(float(ParameterRef(orientation.attrib.get('p', 0))))
- droll = math.degrees(float(ParameterRef(orientation.attrib.get('r', 0))))
- if not OpenScenarioParser.use_carla_coordinate_system:
- dyaw = dyaw * (-1.0)
- transform.rotation.yaw = transform.rotation.yaw + dyaw
- transform.rotation.pitch = transform.rotation.pitch + dpitch
- transform.rotation.roll = transform.rotation.roll + droll
- if offset != 0:
- forward_vector = transform.rotation.get_forward_vector()
- orthogonal_vector = carla.Vector3D(x=-forward_vector.y, y=forward_vector.x, z=forward_vector.z)
- transform.location.x = transform.location.x + offset * orthogonal_vector.x
- transform.location.y = transform.location.y + offset * orthogonal_vector.y
- return transform
- elif position.find('RoutePosition') is not None:
- raise NotImplementedError("Route positions are not yet supported")
- else:
- raise AttributeError("Unknown position")
- @staticmethod
- def convert_condition_to_atomic(condition, actor_list):
- """
- Convert an OpenSCENARIO condition into a Behavior/Criterion atomic
- If there is a delay defined in the condition, then the condition is checked after the delay time
- passed by, e.g. <Condition name="" delay="5">.
- Note: Not all conditions are currently supported.
- """
- atomic = None
- delay_atomic = None
- condition_name = condition.attrib.get('name')
- if condition.attrib.get('delay') is not None and float(condition.attrib.get('delay')) != 0:
- delay = float(condition.attrib.get('delay'))
- delay_atomic = TimeOut(delay)
- if condition.find('ByEntityCondition') is not None:
- trigger_actor = None # A-priori validation ensures that this will be not None
- triggered_actor = None
- for triggering_entities in condition.find('ByEntityCondition').iter('TriggeringEntities'):
- for entity in triggering_entities.iter('EntityRef'):
- for actor in actor_list:
- if actor is not None and entity.attrib.get('entityRef', None) == actor.attributes['role_name']:
- trigger_actor = actor
- break
- for entity_condition in condition.find('ByEntityCondition').iter('EntityCondition'):
- if entity_condition.find('EndOfRoadCondition') is not None:
- end_road_condition = entity_condition.find('EndOfRoadCondition')
- condition_duration = ParameterRef(end_road_condition.attrib.get('duration'))
- atomic_cls = py_trees.meta.inverter(EndofRoadTest)
- atomic = atomic_cls(trigger_actor, condition_duration, terminate_on_failure=True,
- name=condition_name)
- elif entity_condition.find('CollisionCondition') is not None:
- collision_condition = entity_condition.find('CollisionCondition')
- if collision_condition.find('EntityRef') is not None:
- collision_entity = collision_condition.find('EntityRef')
- for actor in actor_list:
- if collision_entity.attrib.get('entityRef', None) == actor.attributes['role_name']:
- triggered_actor = actor
- break
- if triggered_actor is None:
- raise AttributeError("Cannot find actor '{}' for condition".format(
- collision_condition.attrib.get('entityRef', None)))
- atomic_cls = py_trees.meta.inverter(CollisionTest)
- atomic = atomic_cls(trigger_actor, other_actor=triggered_actor, terminate_on_failure=True,
- name=condition_name)
- elif collision_condition.find('ByType') is not None:
- collision_type = collision_condition.find('ByType').attrib.get('type', None)
- triggered_type = OpenScenarioParser.actor_types[collision_type]
- atomic_cls = py_trees.meta.inverter(CollisionTest)
- atomic = atomic_cls(trigger_actor, other_actor_type=triggered_type, terminate_on_failure=True,
- name=condition_name)
- else:
- atomic_cls = py_trees.meta.inverter(CollisionTest)
- atomic = atomic_cls(trigger_actor, terminate_on_failure=True, name=condition_name)
- elif entity_condition.find('OffroadCondition') is not None:
- off_condition = entity_condition.find('OffroadCondition')
- condition_duration = ParameterRef(off_condition.attrib.get('duration'))
- atomic_cls = py_trees.meta.inverter(OffRoadTest)
- atomic = atomic_cls(trigger_actor, condition_duration, terminate_on_failure=True,
- name=condition_name)
- elif entity_condition.find('TimeHeadwayCondition') is not None:
- headtime_condition = entity_condition.find('TimeHeadwayCondition')
- condition_value = ParameterRef(headtime_condition.attrib.get('value'))
- condition_rule = headtime_condition.attrib.get('rule')
- condition_operator = OpenScenarioParser.operators[condition_rule]
- condition_freespace = strtobool(headtime_condition.attrib.get('freespace', False))
- condition_along_route = strtobool(headtime_condition.attrib.get('alongRoute', False))
- for actor in actor_list:
- if headtime_condition.attrib.get('entityRef', None) == actor.attributes['role_name']:
- triggered_actor = actor
- break
- if triggered_actor is None:
- raise AttributeError("Cannot find actor '{}' for condition".format(
- headtime_condition.attrib.get('entityRef', None)))
- atomic = InTimeToArrivalToVehicle(
- trigger_actor, triggered_actor, condition_value, condition_freespace,
- condition_along_route, condition_operator, condition_name
- )
- elif entity_condition.find('TimeToCollisionCondition') is not None:
- ttc_condition = entity_condition.find('TimeToCollisionCondition')
- condition_rule = ttc_condition.attrib.get('rule')
- condition_operator = OpenScenarioParser.operators[condition_rule]
- condition_value = ParameterRef(ttc_condition.attrib.get('value'))
- condition_target = ttc_condition.find('TimeToCollisionConditionTarget')
- entity_ref_ = condition_target.find('EntityRef')
- condition_freespace = strtobool(ttc_condition.attrib.get('freespace', False))
- condition_along_route = strtobool(ttc_condition.attrib.get('alongRoute', False))
- if condition_target.find('Position') is not None:
- position = condition_target.find('Position')
- atomic = InTimeToArrivalToOSCPosition(
- trigger_actor, position, condition_value, condition_along_route, condition_operator)
- else:
- for actor in actor_list:
- if entity_ref_.attrib.get('entityRef', None) == actor.attributes['role_name']:
- triggered_actor = actor
- break
- if triggered_actor is None:
- raise AttributeError("Cannot find actor '{}' for condition".format(
- entity_ref_.attrib.get('entityRef', None)))
- atomic = InTimeToArrivalToVehicle(
- trigger_actor, triggered_actor, condition_value, condition_freespace,
- condition_along_route, condition_operator, condition_name)
- elif entity_condition.find('AccelerationCondition') is not None:
- accel_condition = entity_condition.find('AccelerationCondition')
- condition_value = ParameterRef(accel_condition.attrib.get('value'))
- condition_rule = accel_condition.attrib.get('rule')
- condition_operator = OpenScenarioParser.operators[condition_rule]
- atomic = TriggerAcceleration(
- trigger_actor, condition_value, condition_operator, condition_name)
- elif entity_condition.find('StandStillCondition') is not None:
- ss_condition = entity_condition.find('StandStillCondition')
- duration = ParameterRef(ss_condition.attrib.get('duration'))
- atomic = StandStill(trigger_actor, condition_name, duration)
- elif entity_condition.find('SpeedCondition') is not None:
- spd_condition = entity_condition.find('SpeedCondition')
- condition_value = ParameterRef(spd_condition.attrib.get('value'))
- condition_rule = spd_condition.attrib.get('rule')
- condition_operator = OpenScenarioParser.operators[condition_rule]
- atomic = TriggerVelocity(
- trigger_actor, condition_value, condition_operator, condition_name)
- elif entity_condition.find('RelativeSpeedCondition') is not None:
- relspd_condition = entity_condition.find('RelativeSpeedCondition')
- condition_value = ParameterRef(relspd_condition.attrib.get('value'))
- condition_rule = relspd_condition.attrib.get('rule')
- condition_operator = OpenScenarioParser.operators[condition_rule]
- for actor in actor_list:
- if relspd_condition.attrib.get('entityRef', None) == actor.attributes['role_name']:
- triggered_actor = actor
- break
- if triggered_actor is None:
- raise AttributeError("Cannot find actor '{}' for condition".format(
- relspd_condition.attrib.get('entityRef', None)))
- atomic = RelativeVelocityToOtherActor(
- trigger_actor, triggered_actor, condition_value, condition_operator, condition_name)
- elif entity_condition.find('TraveledDistanceCondition') is not None:
- distance_condition = entity_condition.find('TraveledDistanceCondition')
- distance_value = ParameterRef(distance_condition.attrib.get('value'))
- atomic = DriveDistance(trigger_actor, distance_value, name=condition_name)
- elif entity_condition.find('ReachPositionCondition') is not None:
- rp_condition = entity_condition.find('ReachPositionCondition')
- distance_value = ParameterRef(rp_condition.attrib.get('tolerance'))
- position = rp_condition.find('Position')
- atomic = InTriggerDistanceToOSCPosition(
- trigger_actor, position, distance_value, name=condition_name)
- elif entity_condition.find('DistanceCondition') is not None:
- distance_condition = entity_condition.find('DistanceCondition')
- distance_value = ParameterRef(distance_condition.attrib.get('value'))
- distance_rule = distance_condition.attrib.get('rule')
- distance_operator = OpenScenarioParser.operators[distance_rule]
- distance_freespace = strtobool(distance_condition.attrib.get('freespace', False))
- if distance_freespace:
- raise NotImplementedError(
- "DistanceCondition: freespace attribute is currently not implemented")
- distance_along_route = strtobool(distance_condition.attrib.get('alongRoute', False))
- if distance_condition.find('Position') is not None:
- position = distance_condition.find('Position')
- atomic = InTriggerDistanceToOSCPosition(
- trigger_actor, position, distance_value, distance_along_route,
- distance_operator, name=condition_name)
- elif entity_condition.find('RelativeDistanceCondition') is not None:
- distance_condition = entity_condition.find('RelativeDistanceCondition')
- distance_value = ParameterRef(distance_condition.attrib.get('value'))
- distance_freespace = distance_condition.attrib.get('freespace', "false") == "true"
- rel_dis_type = distance_condition.attrib.get('relativeDistanceType')
- for actor in actor_list:
- if distance_condition.attrib.get('entityRef', None) == actor.attributes['role_name']:
- triggered_actor = actor
- break
- if triggered_actor is None:
- raise AttributeError("Cannot find actor '{}' for condition".format(
- distance_condition.attrib.get('entityRef', None)))
- condition_rule = distance_condition.attrib.get('rule')
- condition_operator = OpenScenarioParser.operators[condition_rule]
- atomic = InTriggerDistanceToVehicle(triggered_actor, trigger_actor, distance_value,
- condition_operator, distance_type=rel_dis_type,
- freespace=distance_freespace, name=condition_name)
- elif condition.find('ByValueCondition') is not None:
- value_condition = condition.find('ByValueCondition')
- if value_condition.find('ParameterCondition') is not None:
- parameter_condition = value_condition.find('ParameterCondition')
- arg_name = parameter_condition.attrib.get('parameterRef')
- value = ParameterRef(parameter_condition.attrib.get('value'))
- rule = parameter_condition.attrib.get('rule')
- if condition_name.startswith('criteria_'):
- if str(value) != '':
- arg_value = float(value)
- else:
- arg_value = 0
- class_name = condition_name[9:]
- if class_name in globals():
- criterion_instance = globals()[class_name]
- else:
- raise AttributeError(
- "The condition {} cannot be mapped to a criterion atomic".format(class_name))
- atomic = py_trees.composites.Parallel("Evaluation Criteria for multiple ego vehicles")
- for triggered_actor in actor_list:
- if arg_name != '':
- atomic.add_child(criterion_instance(triggered_actor, arg_value))
- else:
- atomic.add_child(criterion_instance(triggered_actor))
- else:
- atomic = CheckParameter(arg_name, value, OpenScenarioParser.operators[rule], name=condition_name)
- elif value_condition.find('SimulationTimeCondition') is not None:
- simtime_condition = value_condition.find('SimulationTimeCondition')
- value = ParameterRef(simtime_condition.attrib.get('value'))
- rule = OpenScenarioParser.operators[simtime_condition.attrib.get('rule')]
- atomic = SimulationTimeCondition(value, comparison_operator=rule)
- elif value_condition.find('TimeOfDayCondition') is not None:
- tod_condition = value_condition.find('TimeOfDayCondition')
- condition_date = tod_condition.attrib.get('dateTime')
- condition_rule = tod_condition.attrib.get('rule')
- condition_operator = OpenScenarioParser.operators[condition_rule]
- atomic = TimeOfDayComparison(condition_date, condition_operator, condition_name)
- elif value_condition.find('StoryboardElementStateCondition') is not None:
- state_condition = value_condition.find('StoryboardElementStateCondition')
- element_name = state_condition.attrib.get('storyboardElementRef')
- element_type = state_condition.attrib.get('storyboardElementType')
- state = state_condition.attrib.get('state')
- if state == "startTransition":
- atomic = OSCStartEndCondition(element_type, element_name, rule="START", name=state + "Condition")
- elif state in ["stopTransition", "endTransition", "completeState"]:
- atomic = OSCStartEndCondition(element_type, element_name, rule="END", name=state + "Condition")
- else:
- raise NotImplementedError(
- "Only start, stop, endTransitions and completeState are currently supported")
- elif value_condition.find('UserDefinedValueCondition') is not None:
- raise NotImplementedError("ByValue UserDefinedValue conditions are not yet supported")
- elif value_condition.find('TrafficSignalCondition') is not None:
- tl_condition = value_condition.find('TrafficSignalCondition')
- name_condition = tl_condition.attrib.get('name')
- traffic_light = OpenScenarioParser.get_traffic_light_from_osc_name(name_condition)
- tl_state = tl_condition.attrib.get('state').upper()
- if tl_state not in OpenScenarioParser.tl_states:
- raise KeyError("CARLA only supports Green, Red, Yellow or Off")
- state_condition = OpenScenarioParser.tl_states[tl_state]
- atomic = WaitForTrafficLightState(
- traffic_light, state_condition, name=condition_name)
- elif value_condition.find('TrafficSignalControllerCondition') is not None:
- raise NotImplementedError("ByValue TrafficSignalController conditions are not yet supported")
- else:
- raise AttributeError("Unknown ByValue condition")
- else:
- raise AttributeError("Unknown condition")
- if delay_atomic is not None and atomic is not None:
- new_atomic = py_trees.composites.Sequence("delayed sequence")
- new_atomic.add_child(delay_atomic)
- new_atomic.add_child(atomic)
- else:
- new_atomic = atomic
- return new_atomic
- @staticmethod
- def convert_maneuver_to_atomic(action, actor, actor_list, catalogs):
- """
- Convert an OpenSCENARIO maneuver action into a Behavior atomic
- Note not all OpenSCENARIO actions are currently supported
- """
- maneuver_name = action.attrib.get('name', 'unknown')
- if action.find('GlobalAction') is not None:
- global_action = action.find('GlobalAction')
- if global_action.find('InfrastructureAction') is not None:
- infrastructure_action = global_action.find('InfrastructureAction').find('TrafficSignalAction')
- if infrastructure_action.find('TrafficSignalStateAction') is not None:
- traffic_light_action = infrastructure_action.find('TrafficSignalStateAction')
- name_condition = traffic_light_action.attrib.get('name')
- traffic_light = OpenScenarioParser.get_traffic_light_from_osc_name(name_condition)
- tl_state = traffic_light_action.attrib.get('state').upper()
- if tl_state not in OpenScenarioParser.tl_states:
- raise KeyError("CARLA only supports Green, Red, Yellow or Off")
- traffic_light_state = OpenScenarioParser.tl_states[tl_state]
- atomic = TrafficLightStateSetter(
- traffic_light, traffic_light_state, name=maneuver_name + "_" + str(traffic_light.id))
- else:
- raise NotImplementedError("TrafficLights can only be influenced via TrafficSignalStateAction")
- elif global_action.find('EnvironmentAction') is not None:
- weather_behavior = ChangeWeather(
- OpenScenarioParser.get_weather_from_env_action(global_action, catalogs))
- friction_behavior = ChangeRoadFriction(
- OpenScenarioParser.get_friction_from_env_action(global_action, catalogs))
- env_behavior = py_trees.composites.Parallel(
- policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name=maneuver_name)
- env_behavior.add_child(
- oneshot_with_check(variable_name=maneuver_name + ">WeatherUpdate", behaviour=weather_behavior))
- env_behavior.add_child(
- oneshot_with_check(variable_name=maneuver_name + ">FrictionUpdate", behaviour=friction_behavior))
- return env_behavior
- elif global_action.find('ParameterAction') is not None:
- parameter_action = global_action.find('ParameterAction')
- parameter_ref = parameter_action.attrib.get('parameterRef')
- if parameter_action.find('ModifyAction') is not None:
- action_rule = parameter_action.find('ModifyAction').find("Rule")
- if action_rule.find("AddValue") is not None:
- rule, value = '+', action_rule.find("AddValue").attrib.get('value')
- else:
- rule, value = '*', action_rule.find("MultiplyByValue").attrib.get('value')
- else:
- rule, value = None, parameter_action.find('SetAction').attrib.get('value')
- atomic = ChangeParameter(parameter_ref, value=ParameterRef(value), rule=rule, name=maneuver_name)
- else:
- raise NotImplementedError("Global actions are not yet supported")
- elif action.find('UserDefinedAction') is not None:
- user_defined_action = action.find('UserDefinedAction')
- if user_defined_action.find('CustomCommandAction') is not None:
- command = user_defined_action.find('CustomCommandAction').attrib.get('type')
- atomic = RunScript(command, base_path=OpenScenarioParser.osc_filepath, name=maneuver_name)
- elif action.find('PrivateAction') is not None:
- private_action = action.find('PrivateAction')
- if private_action.find('LongitudinalAction') is not None:
- private_action = private_action.find('LongitudinalAction')
- if private_action.find('SpeedAction') is not None:
- long_maneuver = private_action.find('SpeedAction')
- # duration and distance
- distance = float('inf')
- duration = float('inf')
- dimension = long_maneuver.find("SpeedActionDynamics").attrib.get('dynamicsDimension')
- if dimension == "distance":
- distance = ParameterRef(long_maneuver.find("SpeedActionDynamics").attrib.get(
- 'value', float("inf")))
- else:
- duration = ParameterRef(long_maneuver.find("SpeedActionDynamics").attrib.get(
- 'value', float("inf")))
- # absolute velocity with given target speed
- if long_maneuver.find("SpeedActionTarget").find("AbsoluteTargetSpeed") is not None:
- target_speed = ParameterRef(long_maneuver.find("SpeedActionTarget").find(
- "AbsoluteTargetSpeed").attrib.get('value', 0))
- atomic = ChangeActorTargetSpeed(
- actor, float(target_speed), distance=distance, duration=duration, name=maneuver_name)
- # relative velocity to given actor
- if long_maneuver.find("SpeedActionTarget").find("RelativeTargetSpeed") is not None:
- relative_speed = long_maneuver.find("SpeedActionTarget").find("RelativeTargetSpeed")
- obj = relative_speed.attrib.get('entityRef')
- value = ParameterRef(relative_speed.attrib.get('value', 0))
- value_type = relative_speed.attrib.get('speedTargetValueType')
- continuous = bool(strtobool(relative_speed.attrib.get('continuous')))
- for traffic_actor in actor_list:
- if (traffic_actor is not None and 'role_name' in traffic_actor.attributes and
- traffic_actor.attributes['role_name'] == obj):
- obj_actor = traffic_actor
- atomic = ChangeActorTargetSpeed(actor,
- target_speed=0.0,
- relative_actor=obj_actor,
- value=value,
- value_type=value_type,
- continuous=continuous,
- distance=distance,
- duration=duration,
- name=maneuver_name)
- elif private_action.find('LongitudinalDistanceAction') is not None:
- long_dist_action = private_action.find("LongitudinalDistanceAction")
- obj = long_dist_action.attrib.get('entityRef')
- for traffic_actor in actor_list:
- if (traffic_actor is not None and 'role_name' in traffic_actor.attributes and
- traffic_actor.attributes['role_name'] == obj):
- obj_actor = traffic_actor
- if "distance" in long_dist_action.attrib and "timeGap" not in long_dist_action.attrib:
- gap_type, gap = 'distance', ParameterRef(long_dist_action.attrib.get('distance'))
- elif "timeGap" in long_dist_action.attrib and "distance" not in long_dist_action.attrib:
- raise NotImplementedError("LongitudinalDistanceAction: timeGap is not implemented")
- else:
- raise ValueError("LongitudinalDistanceAction: " +
- "Please specify any one attribute of [distance, timeGap]")
- constraints = long_dist_action.find('DynamicConstraints')
- max_speed = constraints.attrib.get('maxSpeed', None) if constraints is not None else None
- continues = bool(strtobool(long_dist_action.attrib.get('continuous')))
- freespace = bool(strtobool(long_dist_action.attrib.get('freespace')))
- atomic = KeepLongitudinalGap(actor, reference_actor=obj_actor, gap=gap, gap_type=gap_type,
- max_speed=max_speed, continues=continues, freespace=freespace,
- name=maneuver_name)
- else:
- raise AttributeError("Unknown longitudinal action")
- elif private_action.find('LateralAction') is not None:
- private_action = private_action.find('LateralAction')
- if private_action.find('LaneChangeAction') is not None:
- # Note: LaneChangeActions are currently only supported for RelativeTargetLane
- lat_maneuver = private_action.find('LaneChangeAction')
- target_lane_rel = ParameterRef(lat_maneuver.find("LaneChangeTarget").find(
- "RelativeTargetLane").attrib.get('value', 0))
- direction = "left" if target_lane_rel > 0 else "right"
- lane_changes = abs(target_lane_rel)
- # duration and distance
- distance = float('inf')
- duration = float('inf')
- dimension = lat_maneuver.find("LaneChangeActionDynamics").attrib.get('dynamicsDimension')
- if dimension == "distance":
- distance = ParameterRef(
- lat_maneuver.find("LaneChangeActionDynamics").attrib.get('value', float("inf")))
- else:
- duration = ParameterRef(
- lat_maneuver.find("LaneChangeActionDynamics").attrib.get('value', float("inf")))
- atomic = ChangeActorLateralMotion(actor, direction=direction,
- distance_lane_change=distance,
- distance_other_lane=10,
- lane_changes=lane_changes,
- name=maneuver_name)
- elif private_action.find('LaneOffsetAction') is not None:
- lat_maneuver = private_action.find('LaneOffsetAction')
- continuous = bool(strtobool(lat_maneuver.attrib.get('continuous', "true")))
- # Parsing of the different Dynamic shapes is missing
- lane_target_offset = lat_maneuver.find('LaneOffsetTarget')
- if lane_target_offset.find('AbsoluteTargetLaneOffset') is not None:
- absolute_offset = ParameterRef(
- lane_target_offset.find('AbsoluteTargetLaneOffset').attrib.get('value', 0))
- atomic = ChangeActorLaneOffset(
- actor, absolute_offset, continuous=continuous, name=maneuver_name)
- elif lane_target_offset.find('RelativeTargetLaneOffset') is not None:
- relative_target_offset = lane_target_offset.find('RelativeTargetLaneOffset')
- relative_offset = ParameterRef(relative_target_offset.attrib.get('value', 0))
- relative_actor = None
- relative_actor_name = relative_target_offset.attrib.get('entityRef', None)
- for _actor in actor_list:
- if _actor is not None and 'role_name' in _actor.attributes:
- if relative_actor_name == _actor.attributes['role_name']:
- relative_actor = _actor
- break
- if relative_actor is None:
- raise AttributeError("Cannot find actor '{}' for condition".format(relative_actor_name))
- atomic = ChangeActorLaneOffset(actor, relative_offset, relative_actor,
- continuous=continuous, name=maneuver_name)
- else:
- raise AttributeError("Unknown target offset")
- else:
- raise AttributeError("Unknown lateral action")
- elif private_action.find('VisibilityAction') is not None:
- raise NotImplementedError("Visibility actions are not yet supported")
- elif private_action.find('SynchronizeAction') is not None:
- sync_action = private_action.find('SynchronizeAction')
- master_actor = None
- for actor_ins in actor_list:
- if actor_ins is not None and 'role_name' in actor_ins.attributes:
- if sync_action.attrib.get('masterEntityRef', None) == actor_ins.attributes['role_name']:
- master_actor = actor_ins
- break
- if master_actor is None:
- raise AttributeError("Cannot find actor '{}' for condition".format(
- sync_action.attrib.get('masterEntityRef', None)))
- master_position = OpenScenarioParser.convert_position_to_transform(
- sync_action.find('TargetPositionMaster'))
- position = OpenScenarioParser.convert_position_to_transform(sync_action.find('TargetPosition'))
- if sync_action.find("FinalSpeed").find("AbsoluteSpeed") is not None:
- final_speed = ParameterRef(sync_action.find("FinalSpeed").find(
- "AbsoluteSpeed").attrib.get('value', 0))
- atomic = SyncArrivalOSC(
- actor, master_actor, position, master_position, final_speed, name=maneuver_name)
- # relative velocity to given actor
- elif sync_action.find("FinalSpeed").find("RelativeSpeedToMaster") is not None:
- relative_speed = sync_action.find("FinalSpeed").find("RelativeSpeedToMaster")
- final_speed = ParameterRef(relative_speed.attrib.get('value', 0))
- relative_type = relative_speed.attrib.get('speedTargetValueType')
- atomic = SyncArrivalOSC(
- actor, master_actor, position, master_position, final_speed,
- relative_to_master=True, relative_type=relative_type, name=maneuver_name)
- else:
- raise AttributeError("Unknown speed action")
- elif private_action.find('ActivateControllerAction') is not None:
- private_action = private_action.find('ActivateControllerAction')
- activate = strtobool(private_action.attrib.get('longitudinal'))
- atomic = ChangeAutoPilot(actor, activate, name=maneuver_name)
- elif private_action.find('ControllerAction') is not None:
- controller_action = private_action.find('ControllerAction')
- module, args = OpenScenarioParser.get_controller(controller_action, catalogs)
- atomic = ChangeActorControl(actor, control_py_module=module, args=args,
- scenario_file_path=OpenScenarioParser.osc_filepath)
- elif private_action.find('TeleportAction') is not None:
- teleport_action = private_action.find('TeleportAction')
- position = teleport_action.find('Position')
- atomic = ActorTransformSetterToOSCPosition(actor, position, name=maneuver_name)
- elif private_action.find('RoutingAction') is not None:
- private_action = private_action.find('RoutingAction')
- if private_action.find('AssignRouteAction') is not None:
- # @TODO: How to handle relative positions here? This might chance at runtime?!
- route_action = private_action.find('AssignRouteAction')
- waypoints = OpenScenarioParser.get_route(route_action, catalogs)
- atomic = ChangeActorWaypoints(actor, waypoints=waypoints, name=maneuver_name)
- elif private_action.find('FollowTrajectoryAction') is not None:
- trajectory_action = private_action.find('FollowTrajectoryAction')
- waypoints, times = OpenScenarioParser.get_trajectory(trajectory_action, catalogs)
- atomic = ChangeActorWaypoints(actor, waypoints=list(zip(waypoints, ['shortest'] * len(waypoints))),
- times=times, name=maneuver_name)
- elif private_action.find('AcquirePositionAction') is not None:
- route_action = private_action.find('AcquirePositionAction')
- osc_position = route_action.find('Position')
- waypoints = [(osc_position, 'fastest')]
- atomic = ChangeActorWaypoints(actor, waypoints=waypoints, name=maneuver_name)
- else:
- raise AttributeError("Unknown private routing action")
- else:
- raise AttributeError("Unknown private action")
- else:
- if list(action):
- raise AttributeError("Unknown action: {}".format(maneuver_name))
- else:
- return Idle(duration=0, name=maneuver_name)
- return atomic
|