control_loss.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. #!/usr/bin/env python
  2. #
  3. # This work is licensed under the terms of the MIT license.
  4. # For a copy, see <https://opensource.org/licenses/MIT>.
  5. """
  6. Control Loss Vehicle scenario:
  7. The scenario realizes that the vehicle looses control due to
  8. bad road conditions, etc. and checks to see if the vehicle
  9. regains control and corrects it's course.
  10. """
  11. from numpy import random
  12. import py_trees
  13. import carla
  14. from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
  15. from srunner.scenariomanager.scenarioatomics.atomic_behaviors import ChangeNoiseParameters, ActorTransformSetter
  16. from srunner.scenariomanager.scenarioatomics.atomic_criteria import CollisionTest
  17. from srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions import (InTriggerDistanceToLocation,
  18. InTriggerDistanceToNextIntersection,
  19. DriveDistance)
  20. from srunner.scenarios.basic_scenario import BasicScenario
  21. from srunner.tools.scenario_helper import get_location_in_distance_from_wp
  22. class ControlLoss(BasicScenario):
  23. """
  24. Implementation of "Control Loss Vehicle" (Traffic Scenario 01)
  25. This is a single ego vehicle scenario
  26. """
  27. def __init__(self, world, ego_vehicles, config, randomize=False, debug_mode=False, criteria_enable=True,
  28. timeout=60):
  29. """
  30. Setup all relevant parameters and create scenario
  31. """
  32. # ego vehicle parameters
  33. self._no_of_jitter = 10
  34. self._noise_mean = 0 # Mean value of steering noise
  35. self._noise_std = 0.01 # Std. deviation of steering noise
  36. self._dynamic_mean_for_steer = 0.001
  37. self._dynamic_mean_for_throttle = 0.045
  38. self._abort_distance_to_intersection = 10
  39. self._current_steer_noise = [0] # This is a list, since lists are mutable
  40. self._current_throttle_noise = [0]
  41. self._start_distance = 20
  42. self._trigger_dist = 2
  43. self._end_distance = 30
  44. self._ego_vehicle_max_steer = 0.0
  45. self._ego_vehicle_max_throttle = 1.0
  46. self._ego_vehicle_target_velocity = 15
  47. self._map = CarlaDataProvider.get_map()
  48. # Timeout of scenario in seconds
  49. self.timeout = timeout
  50. # The reference trigger for the control loss
  51. self._reference_waypoint = self._map.get_waypoint(config.trigger_points[0].location)
  52. self.loc_list = []
  53. self.obj = []
  54. self._randomize = randomize
  55. super(ControlLoss, self).__init__("ControlLoss",
  56. ego_vehicles,
  57. config,
  58. world,
  59. debug_mode,
  60. criteria_enable=criteria_enable)
  61. def _initialize_actors(self, config):
  62. """
  63. Custom initialization
  64. """
  65. if self._randomize:
  66. self._distance = random.randint(low=10, high=80, size=3)
  67. self._distance = sorted(self._distance)
  68. else:
  69. self._distance = [14, 48, 74]
  70. first_loc, _ = get_location_in_distance_from_wp(self._reference_waypoint, self._distance[0])
  71. second_loc, _ = get_location_in_distance_from_wp(self._reference_waypoint, self._distance[1])
  72. third_loc, _ = get_location_in_distance_from_wp(self._reference_waypoint, self._distance[2])
  73. self.loc_list.extend([first_loc, second_loc, third_loc])
  74. self._dist_prop = [x - 2 for x in self._distance]
  75. self.first_loc_prev, _ = get_location_in_distance_from_wp(self._reference_waypoint, self._dist_prop[0])
  76. self.sec_loc_prev, _ = get_location_in_distance_from_wp(self._reference_waypoint, self._dist_prop[1])
  77. self.third_loc_prev, _ = get_location_in_distance_from_wp(self._reference_waypoint, self._dist_prop[2])
  78. self.first_transform = carla.Transform(self.first_loc_prev)
  79. self.sec_transform = carla.Transform(self.sec_loc_prev)
  80. self.third_transform = carla.Transform(self.third_loc_prev)
  81. self.first_transform = carla.Transform(carla.Location(self.first_loc_prev.x,
  82. self.first_loc_prev.y,
  83. self.first_loc_prev.z))
  84. self.sec_transform = carla.Transform(carla.Location(self.sec_loc_prev.x,
  85. self.sec_loc_prev.y,
  86. self.sec_loc_prev.z))
  87. self.third_transform = carla.Transform(carla.Location(self.third_loc_prev.x,
  88. self.third_loc_prev.y,
  89. self.third_loc_prev.z))
  90. first_debris = CarlaDataProvider.request_new_actor('static.prop.dirtdebris01', self.first_transform, 'prop')
  91. second_debris = CarlaDataProvider.request_new_actor('static.prop.dirtdebris01', self.sec_transform, 'prop')
  92. third_debris = CarlaDataProvider.request_new_actor('static.prop.dirtdebris01', self.third_transform, 'prop')
  93. first_debris.set_transform(self.first_transform)
  94. second_debris.set_transform(self.sec_transform)
  95. third_debris.set_transform(self.third_transform)
  96. self.obj.extend([first_debris, second_debris, third_debris])
  97. for debris in self.obj:
  98. debris.set_simulate_physics(False)
  99. self.other_actors.append(first_debris)
  100. self.other_actors.append(second_debris)
  101. self.other_actors.append(third_debris)
  102. def _create_behavior(self):
  103. """
  104. The scenario defined after is a "control loss vehicle" scenario. After
  105. invoking this scenario, it will wait until the vehicle drove a few meters
  106. (_start_distance), and then perform a jitter action. Finally, the vehicle
  107. has to reach a target point (_end_distance). If this does not happen within
  108. 60 seconds, a timeout stops the scenario
  109. """
  110. # start condition
  111. start_end_parallel = py_trees.composites.Parallel("Jitter",
  112. policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
  113. start_condition = InTriggerDistanceToLocation(self.ego_vehicles[0], self.first_loc_prev, self._trigger_dist)
  114. for _ in range(self._no_of_jitter):
  115. # change the current noise to be applied
  116. turn = ChangeNoiseParameters(self._current_steer_noise, self._current_throttle_noise,
  117. self._noise_mean, self._noise_std, self._dynamic_mean_for_steer,
  118. self._dynamic_mean_for_throttle) # Mean value of steering noise
  119. # Noise end! put again the added noise to zero.
  120. noise_end = ChangeNoiseParameters(self._current_steer_noise, self._current_throttle_noise,
  121. 0, 0, 0, 0)
  122. jitter_action = py_trees.composites.Parallel("Jitter",
  123. policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE)
  124. # Abort jitter_sequence, if the vehicle is approaching an intersection
  125. jitter_abort = InTriggerDistanceToNextIntersection(self.ego_vehicles[0], self._abort_distance_to_intersection)
  126. # endcondition: Check if vehicle reached waypoint _end_distance from here:
  127. end_condition = DriveDistance(self.ego_vehicles[0], self._end_distance)
  128. start_end_parallel.add_child(start_condition)
  129. start_end_parallel.add_child(end_condition)
  130. # Build behavior tree
  131. sequence = py_trees.composites.Sequence("ControlLoss")
  132. sequence.add_child(ActorTransformSetter(self.other_actors[0], self.first_transform, physics=False))
  133. sequence.add_child(ActorTransformSetter(self.other_actors[1], self.sec_transform, physics=False))
  134. sequence.add_child(ActorTransformSetter(self.other_actors[2], self.third_transform, physics=False))
  135. jitter = py_trees.composites.Sequence("Jitter Behavior")
  136. jitter.add_child(turn)
  137. jitter.add_child(InTriggerDistanceToLocation(self.ego_vehicles[0], self.sec_loc_prev, self._trigger_dist))
  138. jitter.add_child(turn)
  139. jitter.add_child(InTriggerDistanceToLocation(self.ego_vehicles[0], self.third_loc_prev, self._trigger_dist))
  140. jitter.add_child(turn)
  141. jitter_action.add_child(jitter)
  142. jitter_action.add_child(jitter_abort)
  143. sequence.add_child(start_end_parallel)
  144. sequence.add_child(jitter_action)
  145. sequence.add_child(end_condition)
  146. sequence.add_child(noise_end)
  147. return sequence
  148. def _create_test_criteria(self):
  149. """
  150. A list of all test criteria will be created that is later used
  151. in parallel behavior tree.
  152. """
  153. criteria = []
  154. collision_criterion = CollisionTest(self.ego_vehicles[0])
  155. criteria.append(collision_criterion)
  156. return criteria
  157. def change_control(self, control):
  158. """
  159. This is a function that changes the control based on the scenario determination
  160. :param control: a carla vehicle control
  161. :return: a control to be changed by the scenario.
  162. """
  163. control.steer += self._current_steer_noise[0]
  164. control.throttle += self._current_throttle_noise[0]
  165. return control
  166. def __del__(self):
  167. """
  168. Remove all actors upon deletion
  169. """
  170. self.remove_all_actors()