Source code for commonroad_dc.feasibility.solution_checker

import math
from typing import Tuple, Dict

import numpy as np
from commonroad.common.solution import PlanningProblemSolution, TrajectoryType, Solution, VehicleModel
from commonroad.geometry.shape import Polygon, ShapeGroup
from commonroad.planning.planning_problem import PlanningProblemSet
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.scenario.obstacle import StaticObstacle, ObstacleType
from commonroad.scenario.scenario import Scenario
from commonroad.scenario.trajectory import State as StateTupleFactory, Trajectory, State
from commonroad_dc.pycrcc import CollisionChecker, CollisionObject

import commonroad_dc.feasibility.feasibility_checker as fc
from commonroad_dc.boundary import construction
from commonroad_dc.collision.collision_detection.pycrcc_collision_dispatch import create_collision_object, \
    create_collision_checker
from commonroad_dc.feasibility.feasibility_checker import FeasibilityException
from commonroad_dc.feasibility.vehicle_dynamics import VehicleDynamics


[docs]class SolutionCheckerException(Exception): """ Main exception class for exceptions related to the solution checker. """ pass
[docs]class CollisionException(SolutionCheckerException): """ Exception class for exceptions related to the collisions. """ pass
[docs]class GoalNotReachedException(SolutionCheckerException): """ Exception class for exceptions related to the goal checks. """ pass
[docs]class MissingSolutionException(SolutionCheckerException): """ Exception class for exceptions related to the planning problem solutions. """ pass
def _simulate_trajectory_if_input_vector(planning_problem_set: PlanningProblemSet, pp_solution: PlanningProblemSolution, dt: float) -> Tuple[VehicleDynamics, Trajectory]: vehicle_dynamics = VehicleDynamics.from_model(pp_solution.vehicle_model, pp_solution.vehicle_type) trajectory = pp_solution.trajectory if pp_solution.trajectory_type in [TrajectoryType.Input, TrajectoryType.PMInput]: initial_state = planning_problem_set.planning_problem_dict[pp_solution.planning_problem_id].initial_state trajectory = vehicle_dynamics.simulate_trajectory(initial_state, trajectory, dt) return vehicle_dynamics, trajectory def _create_pp_solution_collision_object(planning_problem_set: PlanningProblemSet, pp_solution: PlanningProblemSolution, dt: float) -> CollisionObject: vehicle_dynamics, trajectory = _simulate_trajectory_if_input_vector(planning_problem_set, pp_solution, dt) trajectory_pred = TrajectoryPrediction(trajectory=trajectory, shape=vehicle_dynamics.shape) collision_object = create_collision_object(trajectory_pred) return collision_object def _construct_boundary_checker(scenario: Scenario) -> CollisionChecker: build = ['section_triangles', 'triangulation'] boundary = construction.construct(scenario, build) road_boundary_shape_list = [] initial_state = None for r in boundary['triangulation'].unpack(): initial_state = StateTupleFactory(position=np.array([0, 0]), orientation=0.0, time_step=0) p = Polygon(np.array(r.vertices())) road_boundary_shape_list.append(p) road_bound = StaticObstacle(obstacle_id=scenario.generate_object_id(), obstacle_type=ObstacleType.ROAD_BOUNDARY, obstacle_shape=ShapeGroup(road_boundary_shape_list), initial_state=initial_state) collision_checker = CollisionChecker() collision_checker.add_collision_object(create_collision_object(road_bound)) return collision_checker def _check_input_vector_feasibility(pp_solution: PlanningProblemSolution, initial_state: State, vehicle_dynamics: VehicleDynamics, dt: float) -> Tuple[bool, Trajectory]: try: feasible, simulated_trajectory = fc.input_vector_feasibility(initial_state, pp_solution.trajectory, vehicle_dynamics, dt) return feasible, simulated_trajectory except FeasibilityException as ex: msg = f'Solution is not feasible on planning problem solution {pp_solution.planning_problem_id}.' raise SolutionCheckerException(msg) from ex def _check_trajectory_feasibility(pp_solution: PlanningProblemSolution, vehicle_dynamics: VehicleDynamics, dt: float) -> Tuple[bool, Trajectory]: try: feasible, reconstructed_inputs = fc.trajectory_feasibility(pp_solution.trajectory, vehicle_dynamics, dt) return feasible, reconstructed_inputs except FeasibilityException as ex: msg = f'Solution is not feasible on planning problem solution {pp_solution.planning_problem_id}.' raise SolutionCheckerException(msg) from ex
[docs]def solution_feasible(solution: Solution, dt: float, planning_problem_set: PlanningProblemSet) -> Dict[int, Tuple[bool, Trajectory, Trajectory]]: """ Checks whether the given solution is feasible. :param solution: Solution :param planning_problem_set: PlanningProblemSet :param dt: Scenario dt :return: planning problem id -> feasible, input or reconstructed input, trajectory or simulated trajectory. Raises FeasibilityException if there is any trajectory in the solution that is not feasible. """ results = {} for pp_solution in solution.planning_problem_solutions: planning_problem = planning_problem_set.planning_problem_dict[pp_solution.planning_problem_id] initial_state = planning_problem.initial_state vehicle_dynamics = VehicleDynamics.from_model(pp_solution.vehicle_model, pp_solution.vehicle_type) if pp_solution.trajectory_type in [TrajectoryType.Input, TrajectoryType.PMInput]: feasible, simulated_trajectory = _check_input_vector_feasibility(pp_solution, initial_state, vehicle_dynamics, dt) results[pp_solution.planning_problem_id] = (feasible, pp_solution.trajectory, simulated_trajectory) else: feasible, inputs = _check_trajectory_feasibility(pp_solution, vehicle_dynamics, dt) results[pp_solution.planning_problem_id] = (feasible, inputs, pp_solution.trajectory) return results
[docs]def obstacle_collision(scenario: Scenario, planning_problem_set: PlanningProblemSet, solution: Solution) -> bool: """ Checks whether there is a collision between the ego vehicles of the solution and the scenario obstacles. :param scenario: Scenario :param planning_problem_set: PlanningProblemSet :param solution: Solution :return: False if there is no collision. Raises CollisionException if there are any obstacle collisions. """ collision_checker = create_collision_checker(scenario) for pp_solution in solution.planning_problem_solutions: collision_object = _create_pp_solution_collision_object(planning_problem_set, pp_solution, scenario.dt) if collision_checker.collide(collision_object): msg = f'There is a collision between the scenario obstacles and the ego vehicle in planning ' \ f'problem solution {pp_solution.planning_problem_id}' raise CollisionException(msg) return False
[docs]def boundary_collision(scenario: Scenario, planning_problem_set: PlanningProblemSet, solution: Solution) -> bool: """ Checks whether the ego vehicles go out of lanelet boundaries. :param scenario: Scenario :param planning_problem_set: PlanningProblemSet :param solution: Solution :return: False if the ego vehicles stay in lanes. Raises CollisionException if there are any boundary collisions. """ collision_checker = _construct_boundary_checker(scenario) for pp_solution in solution.planning_problem_solutions: collision_object = _create_pp_solution_collision_object(planning_problem_set, pp_solution, scenario.dt) if collision_checker.collide(collision_object): msg = f'There is a collision between lanelet boundaries and the ego vehicle in planning ' \ f'problem solution {pp_solution.planning_problem_id}' raise CollisionException(msg) return False
[docs]def ego_collision(scenario: Scenario, planning_problem_set: PlanningProblemSet, solution: Solution) -> bool: """ Checks whether the ego vehicles collide with each other in the solution. :param scenario: Scenario :param planning_problem_set: PlanningProblemSet :param solution: Solution :return: False if there is no collision. Raises CollisionException if there are any collisions between ego vehicles. """ collision_checker = CollisionChecker() checked_pp_ids = [] for pp_solution in solution.planning_problem_solutions: collision_object = _create_pp_solution_collision_object(planning_problem_set, pp_solution, scenario.dt) if collision_checker.collide(collision_object): msg = f'There is a collision between ego vehicles in planning problem solutions {checked_pp_ids}' raise CollisionException(msg) collision_checker.add_collision_object(collision_object) checked_pp_ids.append(pp_solution.planning_problem_id) return False
[docs]def goal_reached(scenario: Scenario, planning_problem_set: PlanningProblemSet, solution: Solution) -> bool: """ Checks whether the goal has been reached for each of the planning problem solutions. :param scenario: Scenario :param planning_problem_set: PlanningProblemSet :param solution: Solution :return: True if all reached goal. Raises GoalNotReachedException if there are any planning problem solutions that have not reached the goal position. """ for pp_solution in solution.planning_problem_solutions: vehicle_dynamics, trajectory = _simulate_trajectory_if_input_vector(planning_problem_set, pp_solution, scenario.dt) planning_problem = planning_problem_set.planning_problem_dict[pp_solution.planning_problem_id] if not planning_problem.goal_reached(trajectory)[0]: msg = f'Ego vehicle has not reached the goal in planning planning problem solution ' \ f'{pp_solution.planning_problem_id}.' raise GoalNotReachedException(msg) return True
[docs]def solved_all_problems(planning_problem_set: PlanningProblemSet, solution: Solution) -> bool: """ Checks whether the solution has solved all planning problems of the scenario. :param planning_problem_set: PlanningProblemSet :param solution: Solution :return: True if there is a solution for each planning problem. Raises MissingSolutionException if there are any missing planning problem solutions. """ pp_ids = set(list(planning_problem_set.planning_problem_dict.keys())) solution_pp_ids = set(solution.planning_problem_ids) if not pp_ids == solution_pp_ids: msg = f'All planning problems of the scenario must be solved! Scenario planning problems: {pp_ids}, ' \ f'Solved planning problems: {solution_pp_ids}' raise MissingSolutionException(msg) return True
[docs]def starts_at_correct_state(solution: Solution, planning_problem_set: PlanningProblemSet) -> bool: """ Checks whether the given solution trajectories or input vectors start at the correct state/time step. Expected time steps are: - Equal to planning problem's initial time step, if input vector solution - Planning problem's initial state, if trajectory solution :param solution: Solution :param planning_problem_set: PlanningProblemSet :return: True if all solution trajectories start at correct time step """ for pp_solution in solution.planning_problem_solutions: planning_problem = planning_problem_set.planning_problem_dict[pp_solution.planning_problem_id] is_input_vector = pp_solution.trajectory_type in [TrajectoryType.Input, TrajectoryType.PMInput] initial_state_pp = planning_problem.initial_state initial_state_sol = pp_solution.trajectory.state_list[0] ts = initial_state_sol.time_step expected_ts = [initial_state_pp.time_step] if is_input_vector: if ts not in expected_ts: msg = f'Planning Problem Solutionn with input vector does not ' \ f'start at correct time step!\nPlanning Problem ID: {pp_solution.planning_problem_id}' \ f'\nExpected time step: {expected_ts}\nActual time step: {ts}' raise SolutionCheckerException(msg) else: for attr in initial_state_pp.attributes: if not hasattr(initial_state_sol, attr): continue solution_attr_tmp = getattr(initial_state_sol, attr) if pp_solution.vehicle_model == VehicleModel.PM: if attr == "orientation": solution_attr_tmp = math.atan2(initial_state_sol.velocity_y, initial_state_sol.velocity) elif attr == "velocity": solution_attr_tmp = math.sqrt(initial_state_sol.velocity_y**2 + initial_state_sol.velocity**2) if attr == "velocity": # tolerance required for motion primitives atol = 2.0 else: atol = 0.1 if not np.allclose(getattr(initial_state_pp, attr), solution_attr_tmp, atol=atol): msg = f'Planning Problem Solution trajectory does not ' \ f'start at the initial_state of the planning problem with ID: ' \ f'{pp_solution.planning_problem_id}' \ f'\nExpected {attr}={getattr(initial_state_pp, attr)}\n' \ f'Received {attr}={solution_attr_tmp}' raise SolutionCheckerException(msg) return True
[docs]def valid_solution(scenario: Scenario, planning_problem_set: PlanningProblemSet, solution: Solution) -> Tuple[bool, Dict[int, Tuple[bool, Trajectory, Trajectory]]]: """ Checks whether a solution is valid or not by checking - Solution has solved all planning problems of the scenario (solved_all_problems) - All planning problem solutions reached goal (goal_reached) - All planning problem solutions trajectories/input vectors start at correct time step (starts_at_correct_ts) - All planning problem solutions are feasible (solution_feasible) - There isn't a collision between the ego vehicles and the scenario obstacles (obstacle_collision) - The ego vehicles don't go out of lane boundaries (boundary_collision) - The ego vehicles don't collide with each other (ego_collision) It returns a dictionary that contains the inputs (or reconstructed inputs if trajectory solution), and trajectory (simulated trajectory if input vector solution). The valid_solution functions is being used as a basis for validity when a submission was made to `commonroad.in.tum.de website <https://commonroad.in.tum.de>`_ for evaluation. If the solution is not valid according to this function, then the solution will not be accepted. :param scenario: Scenario :param planning_problem_set: PlanningProblemSet :param solution: Solution :return: True if all checks pass successfully, and dictionary for simulated trajectories or reconstructed inputs. Raises SolutionCheckerException if any of the checks fail. """ valid = all([ solved_all_problems(planning_problem_set, solution), goal_reached(scenario, planning_problem_set, solution), starts_at_correct_state(solution, planning_problem_set), not obstacle_collision(scenario, planning_problem_set, solution), not boundary_collision(scenario, planning_problem_set, solution), not ego_collision(scenario, planning_problem_set, solution) ]) results = solution_feasible(solution, scenario.dt, planning_problem_set) all_feasible = all([ result[0] for pp_id, result in results.items() ]) return valid and all_feasible, results