import os
import logging
from typing import Optional, Union, Tuple, List
import yaml
from omegaconf import ListConfig, DictConfig
import numpy as np
import commonroad_reach.pycrreach as reach
from commonroad.scenario.scenario import Scenario
from commonroad.scenario.trajectory import State
from commonroad.planning.goal import GoalRegion
from commonroad.planning.planning_problem import PlanningProblem
from commonroad.common.solution import VehicleType
from commonroad_dc.feasibility.vehicle_dynamics import VehicleParameterMapping
from commonroad_dc.pycrccosy import CurvilinearCoordinateSystem
from commonroad_dc.geometry.util import resample_polyline
from commonroad_route_planner.route_planner import RoutePlanner
import commonroad_reach.utility.logger as util_logger
from commonroad_reach.utility import configuration as util_configuration
from commonroad_reach.utility import general as util_general
logger = logging.getLogger(__name__)
[docs]class Configuration:
"""
Class holding all relevant configurations.
"""
def __init__(self, config_omega: Union[ListConfig, DictConfig]):
self.name_scenario = config_omega.general.name_scenario
self.scenario: Optional[Scenario] = None
self.planning_problem: Optional[PlanningProblem] = None
self.general: GeneralConfiguration = GeneralConfiguration(config_omega)
self.planning: PlanningConfiguration = PlanningConfiguration(config_omega)
self.vehicle: VehicleConfiguration = VehicleConfiguration(config_omega)
self.reachable_set: ReachableSetConfiguration = ReachableSetConfiguration(config_omega)
self.debug: DebugConfiguration = DebugConfiguration(config_omega)
[docs] def update(self, scenario: Scenario = None,
planning_problem: PlanningProblem = None, idx_planning_problem: int = 0,
state_initial: State = None, goal_region: GoalRegion = None,
CLCS: CurvilinearCoordinateSystem = None, list_ids_lanelets: List[int] = None):
"""
Updates configuration based on the given attributes.
Possible ways of completing the configuration:
1. Empty attributes: loads scenario and planning problem from the xml files, computes route and CLCS
2. Scenario + planning problem (+ initial state): computes route and CLCS
3. Scenario + initial state + goal region: computes route and CLCS
4. Scenario + initial state + CLCS: retrieve reference path from CLCS
Additionally, a list of lanelet IDs (e.g., from a given route) can be passed to restrict the reachable set
computation a-priori to a desired set of lanelets.
"""
# patterns that do not require loading scenario and planning problem from xml files.
if scenario and (planning_problem or
(state_initial and goal_region) or
(state_initial and CLCS)):
pass
else:
scenario, planning_problem = util_general.load_scenario_and_planning_problem(self, idx_planning_problem)
self.scenario = scenario
self.planning_problem = planning_problem
self.planning.state_initial = state_initial
self.planning.goal_region = goal_region
self.planning.CLCS = CLCS
self.planning.list_ids_lanelets = list_ids_lanelets
self.planning.update_configuration(self)
self.vehicle.update_configuration(self)
self.reachable_set.update_configuration(self)
[docs] def print_configuration_summary(self):
"""
Prints a summary of the configuration.
"""
dict_clcs_to_string = {"CART": "cartesian", "CVLN": "curvilinear"}
dict_mode_computation_to_string = {1: "polytopic, python backend", 2: "polytopic, c++ backend",
3: "graph-based (online)", 4: "graph-based (offline)"}
dict_mode_repartition_to_string = {1: "repartition, collision check", 2: "collision check, repartition",
3: "repartition, collision check, then repartition"}
dict_mode_inflation_to_string = {1: "inscribed circle", 2: "circumscribed circle",
3: "three circle approximation"}
CLCS = dict_clcs_to_string[self.planning.coordinate_system]
mode_computation = dict_mode_computation_to_string[self.reachable_set.mode_computation]
mode_repartition = dict_mode_repartition_to_string[self.reachable_set.mode_repartition]
mode_inflation = dict_mode_inflation_to_string[self.reachable_set.mode_inflation]
string = "# ===== Configuration Summary ===== #\n"
string += f"# {self.scenario.scenario_id}\n"
string += "# Planning:\n"
string += f"# \tdt: {self.planning.dt}\n"
string += f"# \tsteps: {self.planning.steps_computation}\n"
string += f"# \tcoordinate system: {CLCS}\n"
config_ego = self.vehicle.ego
string += "# Vehicle (Ego):\n"
string += f"# \tvehicle type id: {self.vehicle.ego.id_type_vehicle}\n"
string += f"# \tv: lon_min = {config_ego.v_lon_min}, lon_max = {config_ego.v_lon_max}, " \
f"lat_min = {config_ego.v_lat_min}, lat_max = {config_ego.v_lat_max}, max = {config_ego.v_max}\n"
string += f"# \ta: lon_min = {config_ego.a_lon_min}, lon_max = {config_ego.a_lon_max}, " \
f"lat_min = {config_ego.a_lat_min}, lat_max = {config_ego.a_lat_max}, max = {config_ego.a_max}\n"
string += "# Reachable set:\n"
string += f"# \tcomputation mode: {mode_computation}\n"
string += f"# \trepartition mode: {mode_repartition}\n"
string += f"# \tinflation mode: {mode_inflation}\n"
string += f"# \tobstacle rasterization: {self.reachable_set.rasterize_obstacles}\n"
string += f"# \tgrid size: {self.reachable_set.size_grid}\n"
string += f"# \tsplit radius: {self.reachable_set.radius_terminal_split}\n"
string += f"# \tprune: {self.reachable_set.prune_nodes_not_reaching_final_step}\n"
string += f"# \tnum threads: {self.reachable_set.num_threads}\n"
string += "# ================================= #"
for line in string.split("\n"):
util_logger.print_and_log_info(logger, line)
[docs] def save(self, path_save: str, name_file: str):
"""
Saves configuration to a yaml file.
"""
dict_save = {"name_scenario": self.name_scenario}
for name_obj, obj in self.__dict__.items():
try:
dict_obj = obj.to_dict()
except AttributeError:
continue
else:
dict_save.update({name_obj: dict_obj})
with open(f'{path_save}/{name_file}.yml', 'w') as file_yaml:
yaml.dump(dict_save, file_yaml, default_flow_style=False, allow_unicode=True)
[docs] def convert_to_cpp_configuration(self) -> reach.Configuration:
"""
Converts to a configuration that is readable by the C++ binding code.
"""
config = reach.Configuration()
config.general.name_scenario = self.name_scenario
config.general.path_scenarios = self.general.path_scenarios
config.vehicle.ego.id_type_vehicle = self.vehicle.ego.id_type_vehicle
config.vehicle.ego.length = self.vehicle.ego.length
config.vehicle.ego.width = self.vehicle.ego.width
config.vehicle.ego.v_lon_min = self.vehicle.ego.v_lon_min
config.vehicle.ego.v_lon_max = self.vehicle.ego.v_lon_max
config.vehicle.ego.v_lat_min = self.vehicle.ego.v_lat_min
config.vehicle.ego.v_lat_max = self.vehicle.ego.v_lat_max
config.vehicle.ego.a_lon_min = self.vehicle.ego.a_lon_min
config.vehicle.ego.a_lon_max = self.vehicle.ego.a_lon_max
config.vehicle.ego.a_lat_min = self.vehicle.ego.a_lat_min
config.vehicle.ego.a_lat_max = self.vehicle.ego.a_lat_max
config.vehicle.ego.a_max = self.vehicle.ego.a_max
config.vehicle.ego.radius_disc = self.vehicle.ego.radius_disc
config.vehicle.ego.circle_distance = self.vehicle.ego.circle_distance
config.vehicle.ego.wheelbase = self.vehicle.ego.wheelbase
config.vehicle.other.id_type_vehicle = self.vehicle.other.id_type_vehicle
config.vehicle.other.length = self.vehicle.other.length
config.vehicle.other.width = self.vehicle.other.width
config.vehicle.other.v_lon_min = self.vehicle.other.v_lon_min
config.vehicle.other.v_lon_max = self.vehicle.other.v_lon_max
config.vehicle.other.v_lat_min = self.vehicle.other.v_lat_min
config.vehicle.other.v_lat_max = self.vehicle.other.v_lat_max
config.vehicle.other.a_lon_min = self.vehicle.other.a_lon_min
config.vehicle.other.a_lon_max = self.vehicle.other.a_lon_max
config.vehicle.other.a_lat_min = self.vehicle.other.a_lat_min
config.vehicle.other.a_lat_max = self.vehicle.other.a_lat_max
config.vehicle.other.a_max = self.vehicle.other.a_max
config.vehicle.other.radius_disc = self.vehicle.other.radius_disc
config.vehicle.other.circle_distance = self.vehicle.other.circle_distance
config.vehicle.other.wheelbase = self.vehicle.other.wheelbase
config.planning.dt = self.planning.dt
config.planning.step_start = self.planning.step_start
config.planning.steps_computation = self.planning.steps_computation
config.planning.p_lon_initial = self.planning.p_lon_initial
config.planning.p_lat_initial = self.planning.p_lat_initial
config.planning.uncertainty_p_lon = self.planning.uncertainty_p_lon
config.planning.uncertainty_p_lat = self.planning.uncertainty_p_lat
config.planning.v_lon_initial = self.planning.v_lon_initial
config.planning.v_lat_initial = self.planning.v_lat_initial
config.planning.uncertainty_v_lon = self.planning.uncertainty_v_lon
config.planning.uncertainty_v_lat = self.planning.uncertainty_v_lat
if self.planning.coordinate_system == "CART":
config.planning.coordinate_system = reach.CoordinateSystem.CARTESIAN
else:
config.planning.coordinate_system = reach.CoordinateSystem.CURVILINEAR
config.planning.CLCS = self.planning.CLCS
if self.planning.reference_point == "REAR":
config.planning.reference_point = reach.ReferencePoint.REAR
else:
config.planning.reference_point = reach.ReferencePoint.CENTER
config.reachable_set.mode_repartition = self.reachable_set.mode_repartition
config.reachable_set.mode_inflation = self.reachable_set.mode_inflation
config.reachable_set.size_grid = self.reachable_set.size_grid
config.reachable_set.size_grid_2nd = self.reachable_set.size_grid_2nd
config.reachable_set.radius_terminal_split = self.reachable_set.radius_terminal_split
config.reachable_set.num_threads = self.reachable_set.num_threads
config.reachable_set.prune_nodes = self.reachable_set.prune_nodes_not_reaching_final_step
config.reachable_set.rasterize_obstacles = self.reachable_set.rasterize_obstacles
# convert lut dict to Cpp configuration via PyBind function
if self.reachable_set.mode_inflation == 3:
config.reachable_set.lut_lon_enlargement = \
reach.LUTLongitudinalEnlargement(self.reachable_set.lut_longitudinal_enlargement)
return config
class ConfigurationBase:
def to_dict(self):
dict_config = dict()
for key, val in self.__dict__.items():
if isinstance(val, np.float64):
val = float(val)
if isinstance(val, (str, int, float, bool)):
dict_config[key] = val
return dict_config
class GeneralConfiguration(ConfigurationBase):
def __init__(self, config: Union[ListConfig, DictConfig]):
config_relevant = config.general
name_scenario = config_relevant.name_scenario
self.path_scenarios = config_relevant.path_scenarios
self.path_scenario = config_relevant.path_scenarios + name_scenario + ".xml"
self.path_output = config_relevant.path_output + name_scenario + "/"
self.path_logs = config_relevant.path_logs
self.path_offline_data = config_relevant.path_offline_data
class VehicleConfiguration(ConfigurationBase):
class Ego:
def __init__(self, config: Union[ListConfig, DictConfig]):
config_relevant = config.vehicle.ego
self.id_type_vehicle = config_relevant.id_type_vehicle
# load vehicle parameters according to id_type_vehicle
try:
vehicle_parameters = VehicleParameterMapping.from_vehicle_type(VehicleType(self.id_type_vehicle))
except KeyError:
raise Exception(f"Given vehicle type id {self.id_type_vehicle} is not valid.")
self.length = vehicle_parameters.l
self.width = vehicle_parameters.w
self.v_lon_min = vehicle_parameters.longitudinal.v_min
self.v_lon_max = vehicle_parameters.longitudinal.v_max
# not present in vehicle parameters
self.v_lat_min = None
self.v_lat_max = None
self.v_max = vehicle_parameters.longitudinal.v_max
self.a_lon_min = -vehicle_parameters.longitudinal.a_max
self.a_lon_max = vehicle_parameters.longitudinal.a_max
# not present in vehicle parameters
self.a_lat_min = None
self.a_lat_max = None
self.a_max = vehicle_parameters.longitudinal.a_max
# distances front/rear axle to vehicle center
self.wb_front_axle = vehicle_parameters.a
self.wb_rear_axle = vehicle_parameters.b
# wheelbase
self.wheelbase = self.wb_front_axle + self.wb_rear_axle
# overwrite with parameters given by vehicle ID if they are explicitly provided in the *.yaml file
for key, value in config_relevant.items():
if value is not None:
setattr(self, key, value)
self.radius_disc, self.circle_distance = \
util_configuration.compute_disc_radius_and_distance(self.length, self.width,
ref_point=config.planning.reference_point,
dist_axle_rear=self.wb_rear_axle)
self.radius_inflation = util_configuration.compute_inflation_radius(config.reachable_set.mode_inflation,
self.length, self.width,
self.radius_disc)
self.update_configuration(config)
def update_configuration(self, config):
# overwrite velocity and acceleration parameters if computing within Cartesian coordinate system
if config.planning.coordinate_system == "CART":
self.v_lon_min = self.v_lat_min = -self.v_max
self.v_lon_max = self.v_lat_max = self.v_max
self.a_lon_min = self.a_lat_min = -self.a_max
self.a_lon_max = self.a_lat_max = self.a_max
class Other:
def __init__(self, config: Union[ListConfig, DictConfig]):
config_relevant = config.vehicle.other
self.id_type_vehicle = config_relevant.id_type_vehicle
# load vehicle parameters according to id_type_vehicle
try:
vehicle_parameters = VehicleParameterMapping.from_vehicle_type(VehicleType(self.id_type_vehicle))
except KeyError:
raise Exception(f"Given vehicle type id {self.id_type_vehicle} is not valid.")
self.length = vehicle_parameters.l
self.width = vehicle_parameters.w
self.v_lon_min = vehicle_parameters.longitudinal.v_min
self.v_lon_max = vehicle_parameters.longitudinal.v_max
# not present in vehicle parameters
self.v_lat_min = None
self.v_lat_max = None
self.v_max = vehicle_parameters.longitudinal.v_max
self.a_lon_min = -vehicle_parameters.longitudinal.a_max
self.a_lon_max = vehicle_parameters.longitudinal.a_max
# not present in vehicle parameters
self.a_lat_min = None
self.a_lat_max = None
self.a_max = vehicle_parameters.longitudinal.a_max
# distances front/rear axle to vehicle center
self.wb_front_axle = vehicle_parameters.a
self.wb_rear_axle = vehicle_parameters.b
# wheelbase
self.wheelbase = self.wb_front_axle + self.wb_rear_axle
# overwrite with parameters in the config file
for key, value in config_relevant.items():
if value is not None:
setattr(self, key, value)
self.radius_disc, self.circle_distance = \
util_configuration.compute_disc_radius_and_distance(self.length, self.width,
ref_point=config.planning.reference_point,
dist_axle_rear=self.wb_rear_axle)
self.radius_inflation = util_configuration.compute_inflation_radius(config.reachable_set.mode_inflation,
self.length, self.width,
self.radius_disc)
def __init__(self, config: Union[ListConfig, DictConfig]):
self.ego = VehicleConfiguration.Ego(config)
self.other = VehicleConfiguration.Other(config)
def update_configuration(self, config: Configuration):
self.ego.update_configuration(config)
def to_dict(self):
dict_config = {"ego": dict(), "other": dict()}
for key, val in self.ego.__dict__.items():
if isinstance(val, np.float64):
val = float(val)
if isinstance(val, (str, int, float, bool)):
dict_config["ego"][key] = val
for key, val in self.other.__dict__.items():
if isinstance(val, np.float64):
val = float(val)
if isinstance(val, (str, int, float, bool)):
dict_config["other"][key] = val
return dict_config
class PlanningConfiguration(ConfigurationBase):
def __init__(self, config: Union[ListConfig, DictConfig]):
config_relevant = config.planning
self.dt = config_relevant.dt
self.step_start = config_relevant.step_start
self.steps_computation = config_relevant.steps_computation
self.p_lon_initial = None
self.p_lat_initial = None
self.uncertainty_p_lon = config_relevant.uncertainty_p_lon
self.uncertainty_p_lat = config_relevant.uncertainty_p_lat
self.v_lon_initial = None
self.v_lat_initial = None
self.uncertainty_v_lon = config_relevant.uncertainty_v_lon
self.uncertainty_v_lat = config_relevant.uncertainty_v_lat
self.o_initial = None
# related to specific planning problem
self.state_initial = None
self.goal_region = None
self.reference_path = None
self.lanelet_network = None
self.list_ids_lanelets = None
self.CLCS = None
self.coordinate_system = config_relevant.coordinate_system
self.reference_point = config_relevant.reference_point
@property
def p_initial(self):
return np.array([self.p_lon_initial, self.p_lat_initial])
@p_initial.setter
def p_initial(self, p_initial: Tuple):
assert (type(p_initial is tuple)), "Initial lon/lat position must be of type tuple with length 2"
assert (len(p_initial) == 2), "Initial lon/lat position must be of type tuple with length 2"
self.p_lon_initial = p_initial[0]
self.p_lat_initial = p_initial[1]
@property
def v_initial(self):
return np.array([self.v_lon_initial, self.v_lat_initial])
@v_initial.setter
def v_initial(self, v_initial: Tuple):
assert (type(v_initial is tuple)), "Initial lon/lat velocity must be of type tuple with length 2"
assert (len(v_initial) == 2), "Initial lon/lat velocity must be of type tuple with length 2"
self.v_lon_initial = v_initial[0]
self.v_lat_initial = v_initial[1]
def update_configuration(self, config: Configuration):
scenario = config.scenario
planning_problem = config.planning_problem
self.lanelet_network = scenario.lanelet_network if not self.list_ids_lanelets \
else util_general.create_lanelet_network_from_ids(scenario.lanelet_network, self.list_ids_lanelets)
self.step_start = planning_problem.initial_state.time_step \
if not self.state_initial else self.state_initial.time_step
assert round(self.dt * 100) % round(scenario.dt * 100) == 0, \
f"Value of dt ({self.dt}) should be a multiple of scenario dt ({scenario.dt})."
if self.coordinate_system == "CART":
p_initial, v_initial, o_initial = util_configuration.compute_initial_state_cart(config)
self.p_lon_initial, self.p_lat_initial = p_initial
self.v_lon_initial, self.v_lat_initial = v_initial
self.o_initial = o_initial
v_max = config.vehicle.ego.v_max
assert -v_max <= self.v_lon_initial <= v_max, \
f"Initial x velocity {self.v_lon_initial} exceeds valid velocity interval [{-v_max}, {v_max}]."
assert -v_max <= self.v_lat_initial <= v_max, \
f"Initial y velocity {self.v_lat_initial} exceeds valid velocity interval [{-v_max}, {v_max}]."
elif self.coordinate_system == "CVLN":
if self.CLCS:
# CLCS is given, retrieve reference path
self.reference_path = np.array(self.CLCS.reference_path())
else:
# plans a route from the initial lanelet to the goal lanelet, set curvilinear coordinate system
route_planner = RoutePlanner(lanelet_network=scenario.lanelet_network,
planning_problem=planning_problem,
state_initial=self.state_initial,
goal_region=self.goal_region)
candidate_holder = route_planner.plan_routes()
route = candidate_holder.retrieve_first_route()
if route:
ref_path_mod = resample_polyline(route.reference_path, 0.5)
self.reference_path = ref_path_mod
self.CLCS = util_configuration.create_curvilinear_coordinate_system(self.reference_path)
self.reference_path = np.array(self.CLCS.reference_path())
p_initial, v_initial = util_configuration.compute_initial_state_cvln(config, self.state_initial)
self.p_lon_initial, self.p_lat_initial = p_initial
self.v_lon_initial, self.v_lat_initial = v_initial
assert config.vehicle.ego.v_lon_min <= self.v_lon_initial <= config.vehicle.ego.v_lon_max, \
f"Initial longitudinal velocity {self.v_lon_initial} exceeds valid velocity interval " \
f"[{config.vehicle.ego.v_lon_min}, {config.vehicle.ego.v_lon_max}]."
assert config.vehicle.ego.v_lat_min <= self.v_lat_initial <= config.vehicle.ego.v_lat_max, \
f"Initial lateral velocity {self.v_lat_initial} exceeds valid velocity interval " \
f"[{config.vehicle.ego.v_lat_min}, {config.vehicle.ego.v_lat_max}]."
class ReachableSetConfiguration(ConfigurationBase):
def __init__(self, config: Union[ListConfig, DictConfig]):
config_relevant = config.reachable_set
self.mode_computation = config_relevant.mode_computation
self.mode_repartition = config_relevant.mode_repartition
self.mode_inflation = config_relevant.mode_inflation
self.consider_traffic = config_relevant.consider_traffic
self.rasterize_obstacles = config_relevant.rasterize_obstacles
self.size_grid = config_relevant.size_grid
self.size_grid_2nd = config_relevant.size_grid_2nd
self.radius_terminal_split = config_relevant.radius_terminal_split
self.prune_nodes_not_reaching_final_step = config_relevant.prune_nodes_not_reaching_final_step
self.name_pickle_offline = config_relevant.name_pickle_offline
self.n_multi_steps = config_relevant.n_multi_steps
self.num_threads = config_relevant.num_threads
self.path_to_lut = os.path.abspath(
os.path.join(config.general.path_scenarios, "..", config_relevant.path_to_lut))
self.lut_longitudinal_enlargement = None
def update_configuration(self, config: Configuration):
if self.mode_inflation == 3:
self.lut_longitudinal_enlargement = util_configuration.read_lut_longitudinal_enlargement(
config.planning.reference_point, config.vehicle.ego.circle_distance, self.path_to_lut)
class DebugConfiguration(ConfigurationBase):
def __init__(self, config: Union[ListConfig, DictConfig]):
config_relevant = config.debug
self.save_plots = config_relevant.save_plots
self.save_config = config_relevant.save_config
self.verbose_debug = config_relevant.verbose_debug
self.verbose_info = config_relevant.verbose_info
self.draw_ref_path = config_relevant.draw_ref_path
self.draw_planning_problem = config_relevant.draw_planning_problem
self.draw_icons = config_relevant.draw_icons
self.draw_lanelet_labels = config_relevant.draw_lanelet_labels
self.plot_limits = config_relevant.plot_limits
self.plot_azimuth = config_relevant.plot_azimuth
self.plot_elevation = config_relevant.plot_elevation
self.ax_distance = config_relevant.ax_distance