12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049 |
- #!/usr/bin/env python
- # Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de
- # Barcelona (UAB).
- #
- # This work is licensed under the terms of the MIT license.
- # For a copy, see <https://opensource.org/licenses/MIT>.
- # Allows controlling a vehicle with a keyboard. For a simpler and more
- # documented example, please take a look at tutorial.py.
- """
- Welcome to CARLA manual control.
- Use ARROWS or WASD keys for control.
- W : throttle
- S : brake
- A/D : steer left/right
- Q : toggle reverse
- Space : hand-brake
- P : toggle autopilot
- M : toggle manual transmission
- ,/. : gear up/down
- L : toggle next light type
- SHIFT + L : toggle high beam
- Z/X : toggle right/left blinker
- I : toggle interior light
- TAB : change camera position
- R : toggle recording images to disk
- CTRL + R : toggle recording of simulation (replacing any previous)
- CTRL + P : start replaying last recorded simulation
- CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds)
- CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds)
- F1 : toggle HUD
- H/? : toggle help
- ESC : quit
- """
- from __future__ import print_function
- # ==============================================================================
- # -- imports -------------------------------------------------------------------
- # ==============================================================================
- import carla
- from carla import ColorConverter as cc
- import argparse
- import os
- import sys
- import time
- import collections
- import datetime
- import logging
- import math
- import weakref
- import csv
- import random
- try:
- import pygame
- from pygame.locals import K_ESCAPE
- from pygame.locals import K_F1
- from pygame.locals import KMOD_CTRL
- from pygame.locals import KMOD_SHIFT
- from pygame.locals import K_BACKSPACE
- from pygame.locals import K_TAB
- from pygame.locals import K_SPACE
- from pygame.locals import K_UP
- from pygame.locals import K_DOWN
- from pygame.locals import K_LEFT
- from pygame.locals import K_RIGHT
- from pygame.locals import K_w
- from pygame.locals import K_a
- from pygame.locals import K_s
- from pygame.locals import K_d
- from pygame.locals import K_q
- from pygame.locals import K_m
- from pygame.locals import K_COMMA
- from pygame.locals import K_PERIOD
- from pygame.locals import K_p
- from pygame.locals import K_i
- from pygame.locals import K_l
- from pygame.locals import K_z
- from pygame.locals import K_x
- from pygame.locals import K_r
- from pygame.locals import K_MINUS
- from pygame.locals import K_EQUALS
- except ImportError:
- raise RuntimeError('cannot import pygame, make sure pygame package is installed')
- try:
- import numpy as np
- except ImportError:
- raise RuntimeError('cannot import numpy, make sure numpy package is installed')
- number_record = 0
- rannumber = random.randint(10000000, 90000000)
- # ==============================================================================
- # -- Global functions ----------------------------------------------------------
- # ==============================================================================
- def get_actor_display_name(actor, truncate=250):
- name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:])
- return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name
- # ==============================================================================
- # -- World ---------------------------------------------------------------------
- # ==============================================================================
- class World(object):
- restarted = False
- def __init__(self, carla_world, hud, args):
- self.world = carla_world
- try:
- self.map = self.world.get_map()
- except RuntimeError as error:
- print('RuntimeError: {}'.format(error))
- print(' The server could not send the OpenDRIVE (.xodr) file:')
- print(' Make sure it exists, has the same name of your town, and is correct.')
- sys.exit(1)
- self.hud = hud
- self.player = None
- self.collision_sensor = None
- self.lane_invasion_sensor = None
- self.gnss_sensor = None
- self.imu_sensor = None
- self.radar_sensor = None
- self.camera_manager = None
- self.restart()
- self.world.on_tick(hud.on_world_tick)
- self.recording_enabled = False
- self.recording_start = 0
- def restart(self):
- if self.restarted:
- return
- self.restarted = True
- self.player_max_speed = 1.589
- self.player_max_speed_fast = 3.713
- # Keep same camera config if the camera manager exists.
- cam_index = self.camera_manager.index if self.camera_manager is not None else 0
- cam_pos_index = self.camera_manager.transform_index if self.camera_manager is not None else 1
- # Get the ego vehicle
- while self.player is None:
- print("Waiting for the ego vehicle...")
- time.sleep(1)
- possible_vehicles = self.world.get_actors().filter('vehicle.*')
- for vehicle in possible_vehicles:
- if vehicle.attributes['role_name'] == 'hero':
- print("Ego vehicle found")
- self.player = vehicle
- # print(vehicle.get_control().steer)
- break
- self.player_name = self.player.type_id
- # Set up the sensors.
- self.collision_sensor = CollisionSensor(self.player, self.hud)
- self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud)
- self.gnss_sensor = GnssSensor(self.player)
- self.imu_sensor = IMUSensor(self.player)
- self.camera_manager = CameraManager(self.player, self.hud)
- self.camera_manager.transform_index = cam_pos_index
- self.camera_manager.set_sensor(cam_index, self.player, notify=False)
- actor_type = get_actor_display_name(self.player)
- self.hud.notification(actor_type)
- self.world.wait_for_tick()
- def tick(self, clock):
- if len(self.world.get_actors().filter(self.player_name)) < 1:
- return False
- self.hud.tick(self, clock)
- return True
- def render(self, display):
- self.camera_manager.render(display)
- self.hud.render(display)
- def destroy_sensors(self):
- self.camera_manager.sensor.destroy()
- self.camera_manager.sensor = None
- self.camera_manager.index = None
- def destroy(self):
- sensors = [
- self.camera_manager.sensor,
- self.collision_sensor.sensor,
- self.lane_invasion_sensor.sensor,
- self.gnss_sensor.sensor,
- self.imu_sensor.sensor]
- for sensor in sensors:
- if sensor is not None:
- sensor.stop()
- sensor.destroy()
- if self.player is not None:
- self.player.destroy()
- # ==============================================================================
- # -- KeyboardControl -----------------------------------------------------------
- # ==============================================================================
- class KeyboardControl(object):
- """Class that handles keyboard input."""
- def __init__(self, world, start_in_autopilot):
- self._autopilot_enabled = start_in_autopilot
- self._control = carla.VehicleControl()
- self._lights = carla.VehicleLightState.NONE
- self._steer_cache = 0.0
- world.player.set_autopilot(self._autopilot_enabled)
- if (self._autopilot_enabled == True):
- world.camera_manager.toggle_recording()
- world.player.set_light_state(self._lights)
- world.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
- def parse_events(self, client, world, clock):
- current_lights = self._lights
- for event in pygame.event.get():
- if event.type == pygame.QUIT:
- return True
- elif event.type == pygame.KEYUP:
- if self._is_quit_shortcut(event.key):
- return True
- elif event.key == K_BACKSPACE:
- if self._autopilot_enabled:
- world.player.set_autopilot(False)
- world.restart()
- world.player.set_autopilot(True)
- else:
- world.restart()
- elif event.key == K_F1:
- world.hud.toggle_info()
- elif event.key == K_TAB:
- world.camera_manager.toggle_camera(world.player)
- elif event.key == K_r and not (pygame.key.get_mods() & KMOD_CTRL):
- world.camera_manager.toggle_recording()
- elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL):
- if (world.recording_enabled):
- client.stop_recorder()
- world.recording_enabled = False
- world.hud.notification("Recorder is OFF")
- else:
- client.start_recorder("manual_recording.rec")
- world.recording_enabled = True
- world.hud.notification("Recorder is ON")
- elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL):
- # stop recorder
- client.stop_recorder()
- world.recording_enabled = False
- # work around to fix camera at start of replaying
- current_index = world.camera_manager.index
- world.destroy_sensors()
- # disable autopilot
- self._autopilot_enabled = False
- world.player.set_autopilot(self._autopilot_enabled)
- world.hud.notification("Replaying file 'manual_recording.rec'")
- # replayer
- client.replay_file("manual_recording.rec", world.recording_start, 0, 0)
- world.camera_manager.set_sensor(current_index)
- elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL):
- if pygame.key.get_mods() & KMOD_SHIFT:
- world.recording_start -= 10
- else:
- world.recording_start -= 1
- world.hud.notification("Recording start time is %d" % (world.recording_start))
- elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL):
- if pygame.key.get_mods() & KMOD_SHIFT:
- world.recording_start += 10
- else:
- world.recording_start += 1
- world.hud.notification("Recording start time is %d" % (world.recording_start))
- elif event.key == K_q:
- self._control.gear = 1 if self._control.reverse else -1
- elif event.key == K_m:
- self._control.manual_gear_shift = not self._control.manual_gear_shift
- self._control.gear = world.player.get_control().gear
- world.hud.notification('%s Transmission' %
- ('Manual' if self._control.manual_gear_shift else 'Automatic'))
- elif self._control.manual_gear_shift and event.key == K_COMMA:
- self._control.gear = max(-1, self._control.gear - 1)
- elif self._control.manual_gear_shift and event.key == K_PERIOD:
- self._control.gear = self._control.gear + 1
- elif event.key == K_p and not pygame.key.get_mods() & KMOD_CTRL:
- self._autopilot_enabled = not self._autopilot_enabled
- world.player.set_autopilot(self._autopilot_enabled)
- world.hud.notification(
- 'Autopilot %s' % ('On' if self._autopilot_enabled else 'Off'))
- world.camera_manager.toggle_recording()
- elif event.key == K_l and pygame.key.get_mods() & KMOD_CTRL:
- current_lights ^= carla.VehicleLightState.Special1
- elif event.key == K_l and pygame.key.get_mods() & KMOD_SHIFT:
- current_lights ^= carla.VehicleLightState.HighBeam
- elif event.key == K_l:
- # Use 'L' key to switch between lights:
- # closed -> position -> low beam -> fog
- if not self._lights & carla.VehicleLightState.Position:
- world.hud.notification("Position lights")
- current_lights |= carla.VehicleLightState.Position
- else:
- world.hud.notification("Low beam lights")
- current_lights |= carla.VehicleLightState.LowBeam
- if self._lights & carla.VehicleLightState.LowBeam:
- world.hud.notification("Fog lights")
- current_lights |= carla.VehicleLightState.Fog
- if self._lights & carla.VehicleLightState.Fog:
- world.hud.notification("Lights off")
- current_lights ^= carla.VehicleLightState.Position
- current_lights ^= carla.VehicleLightState.LowBeam
- current_lights ^= carla.VehicleLightState.Fog
- elif event.key == K_i:
- current_lights ^= carla.VehicleLightState.Interior
- elif event.key == K_z:
- current_lights ^= carla.VehicleLightState.LeftBlinker
- elif event.key == K_x:
- current_lights ^= carla.VehicleLightState.RightBlinker
- if not self._autopilot_enabled:
- self._parse_vehicle_keys(pygame.key.get_pressed(), clock.get_time())
- self._control.reverse = self._control.gear < 0
- # Set automatic control-related vehicle lights
- if self._control.brake:
- current_lights |= carla.VehicleLightState.Brake
- else: # Remove the Brake flag
- current_lights &= ~carla.VehicleLightState.Brake
- if self._control.reverse:
- current_lights |= carla.VehicleLightState.Reverse
- else: # Remove the Reverse flag
- current_lights &= ~carla.VehicleLightState.Reverse
- if current_lights != self._lights: # Change the light state only if necessary
- self._lights = current_lights
- world.player.set_light_state(carla.VehicleLightState(self._lights))
- world.player.apply_control(self._control)
- def _parse_vehicle_keys(self, keys, milliseconds):
- if keys[K_UP] or keys[K_w]:
- self._control.throttle = min(self._control.throttle + 0.1, 1.00)
- else:
- self._control.throttle = 0.0
- if keys[K_DOWN] or keys[K_s]:
- self._control.brake = min(self._control.brake + 0.2, 1)
- else:
- self._control.brake = 0
- steer_increment = 5e-4 * milliseconds
- if keys[K_LEFT] or keys[K_a]:
- if self._steer_cache > 0:
- self._steer_cache = 0
- else:
- self._steer_cache -= steer_increment
- elif keys[K_RIGHT] or keys[K_d]:
- if self._steer_cache < 0:
- self._steer_cache = 0
- else:
- self._steer_cache += steer_increment
- else:
- self._steer_cache = 0.0
- self._steer_cache = min(1.0, max(-1.0, self._steer_cache))
- self._control.steer = round(self._steer_cache, 1)
- self._control.hand_brake = keys[K_SPACE]
- @staticmethod
- def _is_quit_shortcut(key):
- return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL)
- # ==============================================================================
- # -- HUD -----------------------------------------------------------------------
- # ==============================================================================
- class HUD(object):
- def __init__(self, width, height):
- self.dim = (width, height)
- font = pygame.font.Font(pygame.font.get_default_font(), 20)
- font_name = 'courier' if os.name == 'nt' else 'mono'
- fonts = [x for x in pygame.font.get_fonts() if font_name in x]
- default_font = 'ubuntumono'
- mono = default_font if default_font in fonts else fonts[0]
- mono = pygame.font.match_font(mono)
- self._font_mono = pygame.font.Font(mono, 12 if os.name == 'nt' else 14)
- self._notifications = FadingText(font, (width, 40), (0, height - 40))
- self.help = HelpText(pygame.font.Font(mono, 16), width, height)
- self.server_fps = 0
- self.frame = 0
- self.simulation_time = 0
- self._show_info = True
- self._info_text = []
- self._server_clock = pygame.time.Clock()
- def on_world_tick(self, timestamp):
- self._server_clock.tick()
- self.server_fps = self._server_clock.get_fps()
- self.frame = timestamp.frame
- self.simulation_time = timestamp.elapsed_seconds
- def tick(self, world, clock):
- self._notifications.tick(world, clock)
- if not self._show_info:
- return
- t = world.player.get_transform()
- v = world.player.get_velocity()
- c = world.player.get_control()
- # print(c.steer)
- compass = world.imu_sensor.compass
- heading = 'N' if compass > 270.5 or compass < 89.5 else ''
- heading += 'S' if 90.5 < compass < 269.5 else ''
- heading += 'E' if 0.5 < compass < 179.5 else ''
- heading += 'W' if 180.5 < compass < 359.5 else ''
- colhist = world.collision_sensor.get_collision_history()
- collision = [colhist[x + self.frame - 200] for x in range(0, 200)]
- max_col = max(1.0, max(collision))
- collision = [x / max_col for x in collision]
- vehicles = world.world.get_actors().filter('vehicle.*')
- self._info_text = [
- 'Server: % 16.0f FPS' % self.server_fps,
- 'Client: % 16.0f FPS' % clock.get_fps(),
- '',
- 'Vehicle: % 20s' % get_actor_display_name(world.player, truncate=20),
- 'Map: % 20s' % world.map.name.split('/')[-1],
- 'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)),
- '',
- 'Speed: % 15.0f km/h' % (3.6 * math.sqrt(v.x ** 2 + v.y ** 2 + v.z ** 2)),
- u'Compass:% 17.0f\N{DEGREE SIGN} % 2s' % (compass, heading),
- 'Accelero: (%5.1f,%5.1f,%5.1f)' % (world.imu_sensor.accelerometer),
- 'Gyroscop: (%5.1f,%5.1f,%5.1f)' % (world.imu_sensor.gyroscope),
- 'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (t.location.x, t.location.y)),
- 'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)),
- 'Height: % 18.0f m' % t.location.z,
- '']
- self._info_text += [
- ('Throttle:', c.throttle, 0.0, 1.0),
- ('Steer:', c.steer, -1.0, 1.0),
- ('Brake:', c.brake, 0.0, 1.0),
- ('Reverse:', c.reverse),
- ('Hand brake:', c.hand_brake),
- ('Manual:', c.manual_gear_shift),
- 'Gear: %s' % {-1: 'R', 0: 'N'}.get(c.gear, c.gear)]
- self._info_text += [
- '',
- 'Collision:',
- collision,
- '',
- 'Number of vehicles: % 8d' % len(vehicles)]
- if len(vehicles) > 1:
- self._info_text += ['Nearby vehicles:']
- distance = lambda l: math.sqrt(
- (l.x - t.location.x) ** 2 + (l.y - t.location.y) ** 2 + (l.z - t.location.z) ** 2)
- vehicles = [(distance(x.get_location()), x) for x in vehicles if x.id != world.player.id]
- for d, vehicle in sorted(vehicles, key=lambda vehicles: vehicles[0]):
- if d > 200.0:
- break
- vehicle_type = get_actor_display_name(vehicle, truncate=22)
- self._info_text.append('% 4dm %s' % (d, vehicle_type))
- def toggle_info(self):
- self._show_info = not self._show_info
- def notification(self, text, seconds=2.0):
- self._notifications.set_text(text, seconds=seconds)
- def error(self, text):
- self._notifications.set_text('Error: %s' % text, (255, 0, 0))
- def render(self, display):
- if self._show_info:
- info_surface = pygame.Surface((220, self.dim[1]))
- info_surface.set_alpha(100)
- display.blit(info_surface, (0, 0))
- v_offset = 4
- bar_h_offset = 100
- bar_width = 106
- for item in self._info_text:
- if v_offset + 18 > self.dim[1]:
- break
- if isinstance(item, list):
- if len(item) > 1:
- points = [(x + 8, v_offset + 8 + (1.0 - y) * 30) for x, y in enumerate(item)]
- pygame.draw.lines(display, (255, 136, 0), False, points, 2)
- item = None
- v_offset += 18
- elif isinstance(item, tuple):
- if isinstance(item[1], bool):
- rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6))
- pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1)
- else:
- rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6))
- pygame.draw.rect(display, (255, 255, 255), rect_border, 1)
- f = (item[1] - item[2]) / (item[3] - item[2])
- if item[2] < 0.0:
- rect = pygame.Rect((bar_h_offset + f * (bar_width - 6), v_offset + 8), (6, 6))
- else:
- rect = pygame.Rect((bar_h_offset, v_offset + 8), (f * bar_width, 6))
- pygame.draw.rect(display, (255, 255, 255), rect)
- item = item[0]
- if item: # At this point has to be a str.
- surface = self._font_mono.render(item, True, (255, 255, 255))
- display.blit(surface, (8, v_offset))
- v_offset += 18
- self._notifications.render(display)
- self.help.render(display)
- # ==============================================================================
- # -- FadingText ----------------------------------------------------------------
- # ==============================================================================
- class FadingText(object):
- def __init__(self, font, dim, pos):
- self.font = font
- self.dim = dim
- self.pos = pos
- self.seconds_left = 0
- self.surface = pygame.Surface(self.dim)
- def set_text(self, text, color=(255, 255, 255), seconds=2.0):
- text_texture = self.font.render(text, True, color)
- self.surface = pygame.Surface(self.dim)
- self.seconds_left = seconds
- self.surface.fill((0, 0, 0, 0))
- self.surface.blit(text_texture, (10, 11))
- def tick(self, _, clock):
- delta_seconds = 1e-3 * clock.get_time()
- self.seconds_left = max(0.0, self.seconds_left - delta_seconds)
- self.surface.set_alpha(500.0 * self.seconds_left)
- def render(self, display):
- display.blit(self.surface, self.pos)
- # ==============================================================================
- # -- HelpText ------------------------------------------------------------------
- # ==============================================================================
- class HelpText(object):
- """Helper class to handle text output using pygame"""
- def __init__(self, font, width, height):
- lines = __doc__.split('\n')
- self.font = font
- self.line_space = 18
- self.dim = (780, len(lines) * self.line_space + 12)
- self.pos = (0.5 * width - 0.5 * self.dim[0], 0.5 * height - 0.5 * self.dim[1])
- self.seconds_left = 0
- self.surface = pygame.Surface(self.dim)
- self.surface.fill((0, 0, 0, 0))
- for n, line in enumerate(lines):
- text_texture = self.font.render(line, True, (255, 255, 255))
- self.surface.blit(text_texture, (22, n * self.line_space))
- self._render = False
- self.surface.set_alpha(220)
- def toggle(self):
- self._render = not self._render
- def render(self, display):
- if self._render:
- display.blit(self.surface, self.pos)
- # ==============================================================================
- # -- CollisionSensor -----------------------------------------------------------
- # ==============================================================================
- class CollisionSensor(object):
- def __init__(self, parent_actor, hud):
- self.sensor = None
- self.history = []
- self._parent = parent_actor
- self.hud = hud
- world = self._parent.get_world()
- bp = world.get_blueprint_library().find('sensor.other.collision')
- self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
- # We need to pass the lambda a weak reference to self to avoid circular
- # reference.
- weak_self = weakref.ref(self)
- self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
- def get_collision_history(self):
- history = collections.defaultdict(int)
- for frame, intensity in self.history:
- history[frame] += intensity
- return history
- @staticmethod
- def _on_collision(weak_self, event):
- self = weak_self()
- if not self:
- return
- actor_type = get_actor_display_name(event.other_actor)
- self.hud.notification('Collision with %r' % actor_type)
- impulse = event.normal_impulse
- intensity = math.sqrt(impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2)
- self.history.append((event.frame, intensity))
- if len(self.history) > 4000:
- self.history.pop(0)
- # ==============================================================================
- # -- LaneInvasionSensor --------------------------------------------------------
- # ==============================================================================
- class LaneInvasionSensor(object):
- def __init__(self, parent_actor, hud):
- self.sensor = None
- # If the spawn object is not a vehicle, we cannot use the Lane Invasion Sensor
- if parent_actor.type_id.startswith("vehicle."):
- self._parent = parent_actor
- self.hud = hud
- world = self._parent.get_world()
- bp = world.get_blueprint_library().find('sensor.other.lane_invasion')
- self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
- # We need to pass the lambda a weak reference to self to avoid circular
- # reference.
- weak_self = weakref.ref(self)
- self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event))
- @staticmethod
- def _on_invasion(weak_self, event):
- self = weak_self()
- if not self:
- return
- lane_types = set(x.type for x in event.crossed_lane_markings)
- text = ['%r' % str(x).split()[-1] for x in lane_types]
- self.hud.notification('Crossed line %s' % ' and '.join(text))
- # ==============================================================================
- # -- GnssSensor ----------------------------------------------------------------
- # ==============================================================================
- class GnssSensor(object):
- def __init__(self, parent_actor):
- self.sensor = None
- self._parent = parent_actor
- self.lat = 0.0
- self.lon = 0.0
- world = self._parent.get_world()
- bp = world.get_blueprint_library().find('sensor.other.gnss')
- self.sensor = world.spawn_actor(bp, carla.Transform(carla.Location(x=1.0, z=2.8)), attach_to=self._parent)
- # We need to pass the lambda a weak reference to self to avoid circular
- # reference.
- weak_self = weakref.ref(self)
- self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event))
- @staticmethod
- def _on_gnss_event(weak_self, event):
- self = weak_self()
- if not self:
- return
- self.lat = event.latitude
- self.lon = event.longitude
- # ==============================================================================
- # -- IMUSensor -----------------------------------------------------------------
- # ==============================================================================
- class IMUSensor(object):
- def __init__(self, parent_actor):
- self.sensor = None
- self._parent = parent_actor
- self.accelerometer = (0.0, 0.0, 0.0)
- self.gyroscope = (0.0, 0.0, 0.0)
- self.compass = 0.0
- world = self._parent.get_world()
- bp = world.get_blueprint_library().find('sensor.other.imu')
- self.sensor = world.spawn_actor(
- bp, carla.Transform(), attach_to=self._parent)
- # We need to pass the lambda a weak reference to self to avoid circular
- # reference.
- weak_self = weakref.ref(self)
- self.sensor.listen(
- lambda sensor_data: IMUSensor._IMU_callback(weak_self, sensor_data))
- @staticmethod
- def _IMU_callback(weak_self, sensor_data):
- self = weak_self()
- if not self:
- return
- limits = (-99.9, 99.9)
- self.accelerometer = (
- max(limits[0], min(limits[1], sensor_data.accelerometer.x)),
- max(limits[0], min(limits[1], sensor_data.accelerometer.y)),
- max(limits[0], min(limits[1], sensor_data.accelerometer.z)))
- self.gyroscope = (
- max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))),
- max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))),
- max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z))))
- self.compass = math.degrees(sensor_data.compass)
- # ==============================================================================
- # -- RadarSensor ---------------------------------------------------------------
- # ==============================================================================
- class RadarSensor(object):
- def __init__(self, parent_actor):
- self.sensor = None
- self._parent = parent_actor
- bound_x = 0.5 + self._parent.bounding_box.extent.x
- bound_y = 0.5 + self._parent.bounding_box.extent.y
- bound_z = 0.5 + self._parent.bounding_box.extent.z
- self.velocity_range = 7.5 # m/s
- world = self._parent.get_world()
- self.debug = world.debug
- bp = world.get_blueprint_library().find('sensor.other.radar')
- bp.set_attribute('horizontal_fov', str(35))
- bp.set_attribute('vertical_fov', str(20))
- self.sensor = world.spawn_actor(
- bp,
- carla.Transform(
- carla.Location(x=bound_x + 0.05, z=bound_z + 0.05),
- carla.Rotation(pitch=5)),
- attach_to=self._parent)
- # We need a weak reference to self to avoid circular reference.
- weak_self = weakref.ref(self)
- self.sensor.listen(
- lambda radar_data: RadarSensor._Radar_callback(weak_self, radar_data))
- @staticmethod
- def _Radar_callback(weak_self, radar_data):
- self = weak_self()
- if not self:
- return
- # To get a numpy [[vel, altitude, azimuth, depth],...[,,,]]:
- # points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4'))
- # points = np.reshape(points, (len(radar_data), 4))
- current_rot = radar_data.transform.rotation
- for detect in radar_data:
- azi = math.degrees(detect.azimuth)
- alt = math.degrees(detect.altitude)
- # The 0.25 adjusts a bit the distance so the dots can
- # be properly seen
- fw_vec = carla.Vector3D(x=detect.depth - 0.25)
- carla.Transform(
- carla.Location(),
- carla.Rotation(
- pitch=current_rot.pitch + alt,
- yaw=current_rot.yaw + azi,
- roll=current_rot.roll)).transform(fw_vec)
- def clamp(min_v, max_v, value):
- return max(min_v, min(value, max_v))
- norm_velocity = detect.velocity / self.velocity_range # range [-1, 1]
- r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0)
- g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0)
- b = int(abs(clamp(- 1.0, 0.0, - 1.0 - norm_velocity)) * 255.0)
- self.debug.draw_point(
- radar_data.transform.location + fw_vec,
- size=0.075,
- life_time=0.06,
- persistent_lines=False,
- color=carla.Color(r, g, b))
- # ==============================================================================
- # -- CameraManager -------------------------------------------------------------
- # ==============================================================================
- class CameraManager(object):
- def __init__(self, parent_actor, hud):
- self.sensor = None
- self.surface = None
- self._parent = parent_actor
- self.hud = hud
- self.recording = False
- bound_x = 0.5 + self._parent.bounding_box.extent.x
- bound_y = 0.5 + self._parent.bounding_box.extent.y
- bound_z = 0.5 + self._parent.bounding_box.extent.z
- Attachment = carla.AttachmentType
- self._camera_transforms = [
- (carla.Transform(carla.Location(x=-2.0 * bound_x, y=+0.0 * bound_y, z=2.0 * bound_z),
- carla.Rotation(pitch=8.0)), Attachment.SpringArm),
- (carla.Transform(carla.Location(x=+0.8 * bound_x, y=+0.0 * bound_y, z=1.3 * bound_z)), Attachment.Rigid),
- (
- carla.Transform(carla.Location(x=+1.9 * bound_x, y=+1.0 * bound_y, z=1.2 * bound_z)), Attachment.SpringArm),
- (carla.Transform(carla.Location(x=-2.8 * bound_x, y=+0.0 * bound_y, z=4.6 * bound_z),
- carla.Rotation(pitch=6.0)), Attachment.SpringArm),
- (carla.Transform(carla.Location(x=-1.0, y=-1.0 * bound_y, z=0.4 * bound_z)), Attachment.Rigid)]
- self.transform_index = 1
- self.sensors = [['sensor.camera.rgb', cc.Raw, 'Camera RGB']]
- world = self._parent.get_world()
- bp_library = world.get_blueprint_library()
- for item in self.sensors:
- bp = bp_library.find(item[0])
- bp.set_attribute('image_size_x', str(hud.dim[0]))
- bp.set_attribute('image_size_y', str(hud.dim[1]))
- bp.set_attribute('gamma', '2.2')
- # bp.set_attribute('sensor_tick', '0.02')
- item.append(bp)
- self.index = None
- def toggle_camera(self, player):
- self.transform_index = (self.transform_index + 1) % len(self._camera_transforms)
- self.set_sensor(self.index, player, notify=False, force_respawn=True)
- def set_sensor(self, index, player, notify=True, force_respawn=False):
- index = index % len(self.sensors)
- needs_respawn = True if self.index is None else \
- (force_respawn or (self.sensors[index][2] != self.sensors[self.index][2]))
- if needs_respawn:
- if self.sensor is not None:
- self.sensor.destroy()
- self.surface = None
- self.sensor = self._parent.get_world().spawn_actor(
- self.sensors[index][-1],
- self._camera_transforms[self.transform_index][0],
- attach_to=self._parent,
- attachment_type=self._camera_transforms[self.transform_index][1])
- # We need to pass the lambda a weak reference to self to avoid
- # circular reference.
- weak_self = weakref.ref(self)
- self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image, player))
- if notify:
- self.hud.notification(self.sensors[index][2])
- self.index = index
- def toggle_recording(self):
- self.recording = not self.recording
- self.hud.notification('Recording %s' % ('On' if self.recording else 'Off'))
- def render(self, display):
- if self.surface is not None:
- display.blit(self.surface, (0, 0))
- @staticmethod
- def _parse_image(weak_self, image, player):
- self = weak_self()
- if not self:
- return
- if self.sensors[self.index][0].startswith('sensor.lidar'):
- points = np.frombuffer(image.raw_data, dtype=np.dtype('f4'))
- points = np.reshape(points, (int(points.shape[0] / 4), 4))
- lidar_data = np.array(points[:, :2])
- lidar_data *= min(self.hud.dim) / (2.0 * self.lidar_range)
- lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1])
- lidar_data = np.fabs(lidar_data) # pylint: disable=E1111
- lidar_data = lidar_data.astype(np.int32)
- lidar_data = np.reshape(lidar_data, (-1, 2))
- lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3)
- lidar_img = np.zeros((lidar_img_size), dtype=np.uint8)
- lidar_img[tuple(lidar_data.T)] = (255, 255, 255)
- self.surface = pygame.surfarray.make_surface(lidar_img)
- elif self.sensors[self.index][0].startswith('sensor.camera.dvs'):
- # Example of converting the raw_data from a carla.DVSEventArray
- # sensor into a NumPy array and using it as an image
- dvs_events = np.frombuffer(image.raw_data, dtype=np.dtype([
- ('x', np.uint16), ('y', np.uint16), ('t', np.int64), ('pol', np.bool)]))
- dvs_img = np.zeros((image.height, image.width, 3), dtype=np.uint8)
- # Blue is positive, red is negative
- dvs_img[dvs_events[:]['y'], dvs_events[:]['x'], dvs_events[:]['pol'] * 2] = 255
- self.surface = pygame.surfarray.make_surface(dvs_img.swapaxes(0, 1))
- elif self.sensors[self.index][0].startswith('sensor.camera.optical_flow'):
- image = image.get_color_coded_flow()
- array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
- array = np.reshape(array, (image.height, image.width, 4))
- array = array[:, :, :3]
- array = array[:, :, ::-1]
- self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
- else:
- image.convert(self.sensors[self.index][1])
- array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
- array = np.reshape(array, (image.height, image.width, 4))
- array = array[:, :, :3]
- array = array[:, :, ::-1]
- self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
- if self.recording:
- global number_record
- global rannumber
- number_record = number_record + 1
- if number_record > 15 and number_record <= 140:
- print("number_record"+str(number_record))
- frameid = image.frame + rannumber
- image.save_to_disk('/home/vangogh/code/fileSystem/downloadDir/simulation_result/%08d' % frameid)
- print("frameid"+str(frameid))
- image_name = "%08d" % frameid
- steering = player.get_control().steer
- list_1 = [str(image_name) + ".png", steering, steering * 4]
- #list_1 = [str(image_name) + ".png", str(seed_name), steering * 4, number_record-15]
- file_path = '/home/vangogh/code/fileSystem/downloadDir/simulation_result/label_test.csv'
- with open(file_path, 'a+', encoding='utf-8') as f:
- csv_writer = csv.writer(f)
- csv_writer.writerow(list_1)
- print("***********************len*****************************")
- print(number_record)
- # ==============================================================================
- # -- game_loop() ---------------------------------------------------------------
- # ==============================================================================
- def game_loop(args):
- pygame.init()
- pygame.font.init()
- world = None
- try:
- client = carla.Client(args.host, args.port)
- client.set_timeout(200) # 200
- sim_world = client.get_world()
- display = pygame.display.set_mode(
- (args.width, args.height),
- pygame.HWSURFACE | pygame.DOUBLEBUF)
- display.fill((0, 0, 0))
- pygame.display.flip()
- hud = HUD(args.width, args.height)
- world = World(sim_world, hud, args)
- controller = KeyboardControl(world, args.autopilot)
- sim_world.wait_for_tick()
- clock = pygame.time.Clock()
- while True:
- clock.tick_busy_loop(60)
- if controller.parse_events(client, world, clock):
- return
- if not world.tick(clock):
- return
- world.render(display)
- pygame.display.flip()
- finally:
- if (world and world.recording_enabled):
- client.stop_recorder()
- if world is not None:
- # prevent destruction of ego vehicle
- if args.keep_ego_vehicle:
- world.player = None
- world.destroy()
- pygame.quit()
- sys.exit()
- # ==============================================================================
- # -- main() --------------------------------------------------------------------
- # ==============================================================================
- def main():
- argparser = argparse.ArgumentParser(
- description='CARLA Manual Control Client')
- argparser.add_argument(
- '-v', '--verbose',
- action='store_true',
- dest='debug',
- help='print debug information')
- argparser.add_argument(
- '--host',
- metavar='H',
- default='127.0.0.1',
- help='IP of the host server (default: 127.0.0.1)')
- argparser.add_argument(
- '-p', '--port',
- metavar='P',
- default=3000,
- type=int,
- help='TCP port to listen to (default: 3000)')
- argparser.add_argument(
- '-a', '--autopilot',
- action='store_true',
- help='enable autopilot. This does not autocomplete the scenario')
- argparser.add_argument(
- '--rolename',
- metavar='NAME',
- default='hero',
- help='role name of ego vehicle to control (default: "hero")')
- argparser.add_argument(
- '--res',
- metavar='WIDTHxHEIGHT',
- default='1280x720',
- help='window resolution (default: 1280x720)')
- argparser.add_argument(
- '--keep_ego_vehicle',
- action='store_true',
- help='do not destroy ego vehicle on exit')
- argparser.add_argument( # add name
- '--name',
- type=str,
- help='seed name')
- args = argparser.parse_args()
- args.width, args.height = [int(x) for x in args.res.split('x')]
- log_level = logging.DEBUG if args.debug else logging.INFO
- logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
- logging.info('listening to server %s:%s', args.host, args.port)
- print(__doc__)
- global seed_name
- seed_name = args.name
- try:
- game_loop(args)
- except KeyboardInterrupt:
- print('\nCancelled by user. Bye!')
- except Exception as error:
- logging.exception(error)
- if __name__ == '__main__':
- main()
|