junction_crossing_route.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #!/usr/bin/env python
  2. # Copyright (c) 2018-2020 Intel Corporation
  3. #
  4. # This work is licensed under the terms of the MIT license.
  5. # For a copy, see <https://opensource.org/licenses/MIT>.
  6. """
  7. All intersection related scenarios that are part of a route.
  8. """
  9. from __future__ import print_function
  10. import py_trees
  11. from srunner.scenariomanager.scenarioatomics.atomic_behaviors import TrafficLightManipulator
  12. from srunner.scenariomanager.scenarioatomics.atomic_criteria import CollisionTest, DrivenDistanceTest, MaxVelocityTest
  13. from srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions import DriveDistance, WaitEndIntersection
  14. from srunner.scenarios.basic_scenario import BasicScenario
  15. class SignalJunctionCrossingRoute(BasicScenario):
  16. """
  17. At routes, these scenarios are simplified, as they can be triggered making
  18. use of the background activity. To ensure interactions with this background
  19. activity, the traffic lights are modified, setting two of them to green
  20. """
  21. # ego vehicle parameters
  22. _ego_max_velocity_allowed = 20 # Maximum allowed velocity [m/s]
  23. _ego_expected_driven_distance = 50 # Expected driven distance [m]
  24. _ego_distance_to_drive = 20 # Allowed distance to drive
  25. _traffic_light = None
  26. # Depending on the route, decide which traffic lights can be modified
  27. def __init__(self, world, ego_vehicles, config, randomize=False, debug_mode=False, criteria_enable=True,
  28. timeout=180):
  29. """
  30. Setup all relevant parameters and create scenario
  31. and instantiate scenario manager
  32. """
  33. # Timeout of scenario in seconds
  34. self.timeout = timeout
  35. self.subtype = config.subtype
  36. super(SignalJunctionCrossingRoute, self).__init__("SignalJunctionCrossingRoute",
  37. ego_vehicles,
  38. config,
  39. world,
  40. debug_mode,
  41. criteria_enable=criteria_enable)
  42. def _initialize_actors(self, config):
  43. """
  44. Custom initialization
  45. """
  46. def _create_behavior(self):
  47. """
  48. Scenario behavior:
  49. When close to an intersection, the traffic lights will turn green for
  50. both the ego_vehicle and another lane, allowing the background activity
  51. to "run" their red light, creating scenarios 7, 8 and 9.
  52. If this does not happen within 120 seconds, a timeout stops the scenario
  53. """
  54. # Changes traffic lights
  55. traffic_hack = TrafficLightManipulator(self.ego_vehicles[0], self.subtype)
  56. # finally wait that ego vehicle drove a specific distance
  57. wait = DriveDistance(
  58. self.ego_vehicles[0],
  59. self._ego_distance_to_drive,
  60. name="DriveDistance")
  61. # Build behavior tree
  62. sequence = py_trees.composites.Sequence("SignalJunctionCrossingRoute")
  63. sequence.add_child(traffic_hack)
  64. sequence.add_child(wait)
  65. return sequence
  66. def _create_test_criteria(self):
  67. """
  68. A list of all test criteria will be created that is later used
  69. in parallel behavior tree.
  70. """
  71. criteria = []
  72. max_velocity_criterion = MaxVelocityTest(
  73. self.ego_vehicles[0],
  74. self._ego_max_velocity_allowed,
  75. optional=True)
  76. collision_criterion = CollisionTest(self.ego_vehicles[0])
  77. driven_distance_criterion = DrivenDistanceTest(
  78. self.ego_vehicles[0],
  79. self._ego_expected_driven_distance)
  80. criteria.append(max_velocity_criterion)
  81. criteria.append(collision_criterion)
  82. criteria.append(driven_distance_criterion)
  83. return criteria
  84. def __del__(self):
  85. """
  86. Remove all actors and traffic lights upon deletion
  87. """
  88. self._traffic_light = None
  89. self.remove_all_actors()
  90. class NoSignalJunctionCrossingRoute(BasicScenario):
  91. """
  92. At routes, these scenarios are simplified, as they can be triggered making
  93. use of the background activity. For unsignalized intersections, just wait
  94. until the ego_vehicle has left the intersection.
  95. """
  96. # ego vehicle parameters
  97. _ego_max_velocity_allowed = 20 # Maximum allowed velocity [m/s]
  98. _ego_expected_driven_distance = 50 # Expected driven distance [m]
  99. _ego_distance_to_drive = 20 # Allowed distance to drive
  100. def __init__(self, world, ego_vehicles, config, randomize=False, debug_mode=False, criteria_enable=True,
  101. timeout=180):
  102. """
  103. Setup all relevant parameters and create scenario
  104. and instantiate scenario manager
  105. """
  106. # Timeout of scenario in seconds
  107. self.timeout = timeout
  108. super(NoSignalJunctionCrossingRoute, self).__init__("NoSignalJunctionCrossingRoute",
  109. ego_vehicles,
  110. config,
  111. world,
  112. debug_mode,
  113. criteria_enable=criteria_enable)
  114. def _initialize_actors(self, config):
  115. """
  116. Custom initialization
  117. """
  118. def _create_behavior(self):
  119. """
  120. Scenario behavior:
  121. When close to an intersection, the traffic lights will turn green for
  122. both the ego_vehicle and another lane, allowing the background activity
  123. to "run" their red light.
  124. If this does not happen within 120 seconds, a timeout stops the scenario
  125. """
  126. # finally wait that ego vehicle drove a specific distance
  127. wait = WaitEndIntersection(
  128. self.ego_vehicles[0],
  129. name="WaitEndIntersection")
  130. end_condition = DriveDistance(
  131. self.ego_vehicles[0],
  132. self._ego_distance_to_drive,
  133. name="DriveDistance")
  134. # Build behavior tree
  135. sequence = py_trees.composites.Sequence("NoSignalJunctionCrossingRoute")
  136. sequence.add_child(wait)
  137. sequence.add_child(end_condition)
  138. return sequence
  139. def _create_test_criteria(self):
  140. """
  141. A list of all test criteria will be created that is later used
  142. in parallel behavior tree.
  143. """
  144. criteria = []
  145. max_velocity_criterion = MaxVelocityTest(
  146. self.ego_vehicles[0],
  147. self._ego_max_velocity_allowed,
  148. optional=True)
  149. collision_criterion = CollisionTest(self.ego_vehicles[0])
  150. driven_distance_criterion = DrivenDistanceTest(
  151. self.ego_vehicles[0],
  152. self._ego_expected_driven_distance)
  153. criteria.append(max_velocity_criterion)
  154. criteria.append(collision_criterion)
  155. criteria.append(driven_distance_criterion)
  156. return criteria
  157. def __del__(self):
  158. """
  159. Remove all actors and traffic lights upon deletion
  160. """
  161. self.remove_all_actors()