123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- #!/usr/bin/env python
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
- """
- This module provides a human agent to control the ego vehicle via keyboard
- """
- from __future__ import print_function
- import json
- try:
- import pygame
- from pygame.locals import K_DOWN
- from pygame.locals import K_LEFT
- from pygame.locals import K_RIGHT
- from pygame.locals import K_SPACE
- from pygame.locals import K_UP
- from pygame.locals import K_a
- from pygame.locals import K_d
- from pygame.locals import K_s
- from pygame.locals import K_w
- from pygame.locals import K_q
- except ImportError:
- raise RuntimeError('cannot import pygame, make sure pygame package is installed')
- import carla
- from srunner.autoagents.autonomous_agent import AutonomousAgent
- class HumanInterface(object):
- """
- Class to control a vehicle manually for debugging purposes
- """
- def __init__(self):
- self._width = 800
- self._height = 600
- self._surface = None
- pygame.init()
- pygame.font.init()
- self._clock = pygame.time.Clock()
- self._display = pygame.display.set_mode((self._width, self._height), pygame.HWSURFACE | pygame.DOUBLEBUF)
- pygame.display.set_caption("Human Agent")
- def run_interface(self, input_data):
- """
- Run the GUI
- """
- # process sensor data
- image_center = input_data['Center'][1][:, :, -2::-1]
- # display image
- self._surface = pygame.surfarray.make_surface(image_center.swapaxes(0, 1))
- if self._surface is not None:
- self._display.blit(self._surface, (0, 0))
- pygame.display.flip()
- def quit_interface(self):
- """
- Stops the pygame window
- """
- pygame.quit()
- class HumanAgent(AutonomousAgent):
- """
- Human agent to control the ego vehicle via keyboard
- """
- current_control = None
- agent_engaged = False
- prev_timestamp = 0
- def setup(self, path_to_conf_file):
- """
- Setup the agent parameters
- """
- self.agent_engaged = False
- self.prev_timestamp = 0
- self._hic = HumanInterface()
- self._controller = KeyboardControl(path_to_conf_file)
- def sensors(self):
- """
- Define the sensor suite required by the agent
- :return: a list containing the required sensors in the following format:
- [
- ['sensor.camera.rgb', {'x':x_rel, 'y': y_rel, 'z': z_rel,
- 'yaw': yaw, 'pitch': pitch, 'roll': roll,
- 'width': width, 'height': height, 'fov': fov}, 'Sensor01'],
- ['sensor.camera.rgb', {'x':x_rel, 'y': y_rel, 'z': z_rel,
- 'yaw': yaw, 'pitch': pitch, 'roll': roll,
- 'width': width, 'height': height, 'fov': fov}, 'Sensor02'],
- ['sensor.lidar.ray_cast', {'x':x_rel, 'y': y_rel, 'z': z_rel,
- 'yaw': yaw, 'pitch': pitch, 'roll': roll}, 'Sensor03']
- ]
- """
- sensors = [{'type': 'sensor.camera.rgb', 'x': 0.7, 'y': 0.0, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
- 'width': 800, 'height': 600, 'fov': 100, 'id': 'Center'},
- {'type': 'sensor.other.gnss', 'x': 0.7, 'y': -0.4, 'z': 1.60, 'id': 'GPS'}
- ]
- return sensors
- def run_step(self, input_data, timestamp):
- """
- Execute one step of navigation.
- """
- self.agent_engaged = True
- self._hic.run_interface(input_data)
- control = self._controller.parse_events(timestamp - self.prev_timestamp)
- self.prev_timestamp = timestamp
- return control
- def destroy(self):
- """
- Cleanup
- """
- self._hic.quit_interface = True
- class KeyboardControl(object):
- """
- Keyboard control for the human agent
- """
- def __init__(self, path_to_conf_file):
- """
- Init
- """
- self._control = carla.VehicleControl()
- self._steer_cache = 0.0
- self._clock = pygame.time.Clock()
- # Get the mode
- if path_to_conf_file:
- with (open(path_to_conf_file, "r")) as f:
- lines = f.read().split("\n")
- self._mode = lines[0].split(" ")[1]
- self._endpoint = lines[1].split(" ")[1]
- # Get the needed vars
- if self._mode == "log":
- self._log_data = {'records': []}
- elif self._mode == "playback":
- self._index = 0
- self._control_list = []
- with open(self._endpoint) as fd:
- try:
- self._records = json.load(fd)
- self._json_to_control()
- except ValueError:
- # Moving to Python 3.5+ this can be replaced with json.JSONDecodeError
- pass
- else:
- self._mode = "normal"
- self._endpoint = None
- def _json_to_control(self):
- """
- Parses the json file into a list of carla.VehicleControl
- """
- # transform strs into VehicleControl commands
- for entry in self._records['records']:
- control = carla.VehicleControl(throttle=entry['control']['throttle'],
- steer=entry['control']['steer'],
- brake=entry['control']['brake'],
- hand_brake=entry['control']['hand_brake'],
- reverse=entry['control']['reverse'],
- manual_gear_shift=entry['control']['manual_gear_shift'],
- gear=entry['control']['gear'])
- self._control_list.append(control)
- def parse_events(self, timestamp):
- """
- Parse the keyboard events and set the vehicle controls accordingly
- """
- # Move the vehicle
- if self._mode == "playback":
- self._parse_json_control()
- else:
- self._parse_vehicle_keys(pygame.key.get_pressed(), timestamp * 1000)
- # Record the control
- if self._mode == "log":
- self._record_control()
- return self._control
- def _parse_vehicle_keys(self, keys, milliseconds):
- """
- Calculate new vehicle controls based on input keys
- """
- for event in pygame.event.get():
- if event.type == pygame.QUIT:
- return
- elif event.type == pygame.KEYUP:
- if event.key == K_q:
- self._control.gear = 1 if self._control.reverse else -1
- self._control.reverse = self._control.gear < 0
- if keys[K_UP] or keys[K_w]:
- self._control.throttle = 0.6
- else:
- self._control.throttle = 0.0
- steer_increment = 3e-4 * milliseconds
- if keys[K_LEFT] or keys[K_a]:
- self._steer_cache -= steer_increment
- elif keys[K_RIGHT] or keys[K_d]:
- self._steer_cache += steer_increment
- else:
- self._steer_cache = 0.0
- self._steer_cache = min(0.95, max(-0.95, self._steer_cache))
- self._control.steer = round(self._steer_cache, 1)
- self._control.brake = 1.0 if keys[K_DOWN] or keys[K_s] else 0.0
- self._control.hand_brake = keys[K_SPACE]
- def _parse_json_control(self):
- """
- Gets the control corresponding to the current frame
- """
- if self._index < len(self._control_list):
- self._control = self._control_list[self._index]
- self._index += 1
- else:
- print("JSON file has no more entries")
- def _record_control(self):
- """
- Saves the list of control into a json file
- """
- new_record = {
- 'control': {
- 'throttle': self._control.throttle,
- 'steer': self._control.steer,
- 'brake': self._control.brake,
- 'hand_brake': self._control.hand_brake,
- 'reverse': self._control.reverse,
- 'manual_gear_shift': self._control.manual_gear_shift,
- 'gear': self._control.gear
- }
- }
- self._log_data['records'].append(new_record)
- def __del__(self):
- """
- Delete method
- """
- # Get ready to log user commands
- if self._mode == "log" and self._log_data:
- with open(self._endpoint, 'w') as fd:
- json.dump(self._log_data, fd, indent=4, sort_keys=True)
|