diff --git a/.gitignore b/.gitignore index f90a9cf..ddb36bd 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ ighastar/scripts/test_case_generation.py examples/ros_kinodynamic_example.py examples/ROS/ examples/BeamNG/ +ighastar/src/IGHAStarClass.py +ighastar/src/Nodes.py # Python __pycache__/ *.py[cod] diff --git a/Content/ROS/ros_example.png b/Content/ROS/ros_example.png new file mode 100644 index 0000000..8e7c4a4 Binary files /dev/null and b/Content/ROS/ros_example.png differ diff --git a/examples/ROS/Configs/ros_example.yml b/examples/ROS/Configs/ros_example.yml new file mode 100644 index 0000000..34d605a --- /dev/null +++ b/examples/ROS/Configs/ros_example.yml @@ -0,0 +1,39 @@ +Planner_config: + experiment_info_default: + resolution: 2.0 + epsilon: [2.0, 2.0, 1.57, 5.0] # ate, cte, heading, vel + tolerance: 0.125 + max_level: 5 + division_factor: 2 # Example division factor + max_expansions: 500 + hysteresis: 100 + node_info: + node_type: "kinodynamic" + length: 0.3 + width: 0.25 + map_res: 0.25 + dt: 0.05 + timesteps: 5 + step_size: 1.0 + steering_list: [-15.0, -5.0, 0.0, 5.0, 15.0] # in degrees. + throttle_list: [-4.0, -2.0, 0, 2.0, 4.0] + del_theta: 90.0 # in degrees + del_vel: 1.0 + RI: 0.5 + max_vert_acc: 5.0 + min_vel: 0.0 # don't go in reverse. + max_vel: 2.0 + max_theta: 30.0 # in degrees + gear_switch_time: 0.0 +topics: + elevation_map: "/elevation_mapping/elevation_map_cropped_cv2" + odom: "/mavros/local_position/odom" + waypoints: "/mavros/mission/waypoints" + path: "/planner_test/path" + diagnostics: "/planner_test/diagnostic" + goal_marker: "/planner_test/goal_marker" + global_position: "/mavros/global_position/global" + +blur_kernel: 0 +lethal_slope: 60 +wp_radius: 2.0 \ No newline at end of file diff --git a/examples/ROS/README.md b/examples/ROS/README.md new file mode 100644 index 0000000..25e53f5 --- /dev/null +++ b/examples/ROS/README.md @@ -0,0 +1,78 @@ +# IGHAStar ROS Example + +This folder contains a ROS-based example for the IGHA* path planner. The code here is really an example script to demonstrate how one would use the planner in the loop on their robot. +It demonstrates how to integrate the IGHA* planner with ROS topics for maps, odometry, waypoints, and visualization. +For simplicity (not needing to fiddle with rviz), there is a built in visualization in the example. +It is however not a "ros-package"; one can use the example shown here as a stencil to incorporate IGHA* into their navigation stack. + + +## Prerequisites +- ROS Noetic +- Python 3 +- IGHAStar C++/CUDA requirements (met with pip install -e .) + +## Setup (confirm these instructions) +1. **Install ROS dependencies:** + Assuming IGHA* has already been installed, + ```bash + # Install ROS packages for message types + sudo apt-get update + sudo apt-get install ros-noetic-grid-map-msgs \ + ros-noetic-mavros-msgs \ + ros-noetic-nav-msgs \ + ros-noetic-geometry-msgs \ + ros-noetic-sensor-msgs \ + ros-noetic-visualization-msgs \ + ros-noetic-diagnostic-msgs \ + ros-noetic-tf + + # Check for any missing dependencies + rosdep install --from-paths src --ignore-src -r -y + ``` + +2. **Verify installation:** + ```bash + # Test that all message types are available + python3 -c " + import rospy + from nav_msgs.msg import Path, Odometry + from mavros_msgs.msg import WaypointList + from grid_map_msgs.msg import GridMap + from visualization_msgs.msg import MarkerArray + from sensor_msgs.msg import NavSatFix + from diagnostic_msgs.msg import DiagnosticArray + from geometry_msgs.msg import PoseStamped, Quaternion + print('All ROS message types imported successfully!') + " + ``` + +## Configuration +- Edit `Configs/ros_example.yml` to set planner parameters, topic names, and costmap settings. + +## Running the Example + + +
+ ROS Example Visualization +
Fig. 1: Example ROS planner visualization. The window shows the costmap (pink/white), elevation map (grayscale), the planned path (green), the goal (white circle), and the robot state (black rectangle).
+
+ +We provide rosbags that you can use to run the example script. Place them in the `rosbags` folder. Download here: [rosbag 1](https://drive.google.com/file/d/1zV0f3NbPuyewwbHUlrcuboj-PMrPixwG/view?usp=sharing), [rosbag 2](https://drive.google.com/file/d/1BPsypv83_W5EtcodyV2W75BSK3rVE3mX/view?usp=sharing) + +1. **Start ROS core:** + ```bash + roscore + ``` +2. **Run the planner node:** + ```bash + python3 examples/ROS/example.py + ``` +3. **Play a rosbag or run your robot simulation** to publish the required topics. + ``` + cd examples/ROS/rosbags/ + rosbag play hound_95.bag + ``` + +## Visualization +- The planner will open an OpenCV window showing the costmap, elevation map, path, goal, and robot state. +- Path and goal are also published as ROS topics for use in RViz. diff --git a/examples/ROS/example.py b/examples/ROS/example.py new file mode 100644 index 0000000..4b1658d --- /dev/null +++ b/examples/ROS/example.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +""" +ROS wrapper for IGHA* planner (single-class version). +- Can be used live (with ROS subscribers) or offline (by calling callbacks with rosbag messages). +- Maintains internal state and publishes planned paths. +- Clean, modern, and easy to adapt for both online and offline use. +""" +import rospy +import numpy as np +import torch +import yaml +from nav_msgs.msg import Path, Odometry +from mavros_msgs.msg import WaypointList +from grid_map_msgs.msg import GridMap +from visualization_msgs.msg import MarkerArray +from utils import * +import cv2 +import time +import os +from ighastar.scripts.common_utils import create_planner +from typing import Any, Optional, Tuple + + +class PlannerNode: + """ + ROS node wrapper for IGHAStar planner. + Handles all ROS communication, map/state/waypoint management, and planning logic. + Can be used live or with rosbag replay by calling the callbacks directly. + """ + + def __init__(self, config: dict) -> None: + # --- Planner config --- + planner_cfg = config["Planner_config"] + self.map_res = planner_cfg["experiment_info_default"]["node_info"]["map_res"] + self.expansion_limit = planner_cfg["experiment_info_default"]["max_expansions"] + self.hysteresis = planner_cfg["experiment_info_default"]["hysteresis"] + self.planner = create_planner(planner_cfg) + # --- Costmap parameters --- + self.blur_kernel = config["blur_kernel"] + self.costmap_cosine_thresh = np.cos(np.radians(config["lethal_slope"])) + self.wp_radius = config["wp_radius"] + # --- IO variables --- + self.state = np.zeros(9) + self.local_waypoints = None + self.global_pos = None + self.map_init = False + self.height_index = None + self.bitmap = None + self.offset = None + self.map_center = None + self.map_size_px = None + # --- ROS publishers --- + topics = config["topics"] + self.path_pub = rospy.Publisher(topics["path"], Path, queue_size=1, latch=True) + self.marker_pub = rospy.Publisher( + topics["goal_marker"], MarkerArray, queue_size=1 + ) + self.diagnostics_pub = rospy.Publisher( + topics["diagnostics"], DiagnosticArray, queue_size=1 + ) + # --- ROS subscribers --- + rospy.Subscriber( + topics["elevation_map"], GridMap, self.map_callback, queue_size=1 + ) + rospy.Subscriber(topics["odom"], Odometry, self.odom_callback, queue_size=1) + rospy.Subscriber( + topics["waypoints"], WaypointList, self.waypoint_callback, queue_size=1 + ) + rospy.Subscriber( + topics["global_position"], NavSatFix, self.global_pos_callback, queue_size=1 + ) + self.plan_loop() + + def map_callback(self, grid_map: GridMap) -> None: + """Update the map from the GridMap message.""" + self.grid_map = grid_map + if self.height_index is None or self.layers is None: + self.layers = self.grid_map.layers + self.height_index = self.layers.index("elevation") + cent = self.grid_map.info.pose.position + self.map_center = np.array([cent.x, cent.y, cent.z]) + matrix = self.grid_map.data[self.height_index] + self.heightmap = map_from_gridmap(matrix) + if np.any(np.isnan(self.heightmap)): + self.map_init = False + else: + # this function creates the bitmap (costmap, heightmap) + # and gets the offset of the map + self.bitmap, self.offset = process_grid_map( + self.heightmap, + lethalmap=None, + map_res=self.map_res, + blur_kernel=self.blur_kernel, + costmap_cosine_thresh=self.costmap_cosine_thresh, + ) + self.map_init = True + + def plan( + self, start_state: np.ndarray, goal_: np.ndarray, stop: bool = False + ) -> Tuple[Optional[np.ndarray], bool, int, float, np.ndarray]: + """Plan a path from the current state to the goal.""" + if self.bitmap is None: + return None, False, 0, 0.0, goal_ + bitmap = torch.clone(self.bitmap) + # this function effectively projects the start and goal to the + # bitmap if it is not already inside the bitmap bounds + start, goal = start_goal_logic( + bitmap, + self.map_res, + start_state, + goal_, + self.map_center, + self.offset, + stop=stop, + ) + now = time.perf_counter() + # this function does the actual planning + # it takes in the start, goal, bitmap, expansion limit, hysteresis, and ignore goal validity + # it returns a boolean indicating if the goal was reached, the number of expansions, the time taken, and the output goal + # the output goal is the goal projected to the bitmap bounds + # Map convention: x is east, y is north. 0,0 is the bottom left corner of the map. + # if the map was shown using cv2.imshow, the origin would be the top left corner. + # Start and goal are relative to the bottom left corner of the map. + success = self.planner.search( + start, goal, bitmap, self.expansion_limit, self.hysteresis, True + ) + time_taken = time.perf_counter() - now + # this function gets the profiler info from the planner + ( + avg_successor_time, + avg_goal_check_time, + avg_overhead_time, + avg_g_update_time, + switches, + max_level_profile, + Q_v_size, + expansion_counter, + expansion_list, + ) = self.planner.get_profiler_info() + output_goal = goal[:2] - (self.offset - self.map_center[:2]) + if success: + path = self.planner.get_best_path().numpy() + path = np.flip(path, axis=0) + path[..., :2] -= self.offset + path[..., :2] += self.map_center[:2] + return path, True, expansion_counter, time_taken, output_goal + else: + return None, False, expansion_counter, time_taken, output_goal + + def odom_callback(self, msg: Odometry) -> None: + """Update local position from Odometry message.""" + self.state = obtain_state(msg, self.state) + + def global_pos_callback(self, msg: Any) -> None: + """Update global position from NavSatFix message.""" + self.global_pos = msg + + def waypoint_callback(self, msg: WaypointList) -> None: + """Update local waypoints from WaypointList message.""" + print("Got waypoints") + self.waypoint_list = msg.waypoints + while self.global_pos is None or self.state is None: + time.sleep(1) + gps_origin = calcposLLH( + self.global_pos.latitude, + self.global_pos.longitude, + -self.state[0], + -self.state[1], + ) + self.local_waypoints = get_local_frame_waypoints(self.waypoint_list, gps_origin) + + def plan_loop(self) -> None: + """Main loop for planning.""" + rate = rospy.Rate(20) + while not rospy.is_shutdown(): + rate.sleep() + if ( + self.local_waypoints is not None + and self.state is not None + and self.bitmap is not None + and len(self.local_waypoints) + ): + self.local_waypoints = np.array(self.local_waypoints) + plan_state = np.array( + [ + self.state[0], + self.state[1], + self.state[5], + np.linalg.norm(self.state[6:8]), + ] + ) + goal = self.local_waypoints[0] + path, success, expansions, time_taken, output_goal = self.plan( + plan_state, goal, stop=len(self.local_waypoints) == 0 + ) + expansions_per_second = max(expansions / time_taken, 1000) + self.expansion_limit = int(expansions_per_second * 0.5) + if success: + publish_path(path, self.path_pub) + publish_goal(output_goal, self.marker_pub) + # Visualize map with path + + diagnostic_publisher( + success, + expansions, + time_taken, + self.hysteresis, + self.diagnostics_pub, + ) + display_map = visualize_map_with_path( + self.bitmap[..., 0].numpy(), + self.bitmap[..., 1].numpy(), + path, + output_goal, + plan_state, + self.wp_radius, + self.map_center, + 480, + 7.5 / self.map_res, + ) + cv2.imshow("planner_vis", display_map) + cv2.waitKey(1) + # reduce length of local waypoints: + dist = np.linalg.norm(self.local_waypoints[0][:2] - plan_state[:2]) + if dist < self.wp_radius: + if len(self.local_waypoints) > 1: + self.local_waypoints = self.local_waypoints[1:] + + +if __name__ == "__main__": + rospy.init_node("ighastar_planner_node") + config_name = "ros_example.yml" + config_path = os.path.join(os.path.dirname(__file__), "Configs", config_name) + with open(config_path) as f: + Config = yaml.safe_load(f) + node = PlannerNode(Config) + rospy.spin() diff --git a/examples/ROS/requirements.txt b/examples/ROS/requirements.txt new file mode 100644 index 0000000..d3860a6 --- /dev/null +++ b/examples/ROS/requirements.txt @@ -0,0 +1,4 @@ +torch +opencv-python +numpy +pyyaml \ No newline at end of file diff --git a/examples/ROS/utils.py b/examples/ROS/utils.py new file mode 100644 index 0000000..4a3c33e --- /dev/null +++ b/examples/ROS/utils.py @@ -0,0 +1,405 @@ +import numpy as np +import cv2 +import torch +from tf.transformations import euler_from_quaternion, quaternion_from_euler +from torch.utils.cpp_extension import load +from nav_msgs.msg import Path +import rospy +from geometry_msgs.msg import PoseStamped +from visualization_msgs.msg import Marker, MarkerArray +from grid_map_msgs.msg import GridMap +from geometry_msgs.msg import Quaternion +from sensor_msgs.msg import NavSatFix +from mavros_msgs.msg import WaypointList +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue +from nav_msgs.msg import Odometry +import os +from typing import Any, Optional, List, Tuple + + +def generate_normal(elev: np.ndarray, k: int = 3) -> np.ndarray: + dzdx = -cv2.Sobel(elev, cv2.CV_32F, 1, 0, ksize=k) + dzdy = -cv2.Sobel(elev, cv2.CV_32F, 0, 1, ksize=k) + dzdz = np.ones_like(elev) + normal = np.stack((dzdx, dzdy, dzdz), axis=2) + norm = np.linalg.norm(normal, axis=2, keepdims=True) + normal = normal / norm + return normal + + +def map_from_gridmap(matrix: Any) -> np.ndarray: + return np.float32( + cv2.flip( + np.reshape( + matrix.data, + (matrix.layout.dim[1].size, matrix.layout.dim[0].size), + order="F", + ).T, + -1, + ) + ) + + +def generate_costmap_from_BEVmap( + BEV_lethal: np.ndarray, + BEV_normal: np.ndarray, + costmap_cosine_thresh: float = np.cos(np.radians(60)), +) -> np.ndarray: + """Generate a costmap from BEV lethal and normal maps.""" + dot_product = BEV_normal[:, :, 2] + angle_cost = np.where(dot_product >= costmap_cosine_thresh, 255, 0).astype( + np.float32 + ) + costmap = 255.0 * (1 - BEV_lethal[..., 0]) + costmap = np.minimum(costmap, angle_cost) + costmap[costmap < 255.0] = 0.0 + return costmap + + +def process_grid_map( + heightmap: np.ndarray, + lethalmap: Optional[np.ndarray] = None, + map_res: float = 0.1, + blur_kernel: int = 0, + costmap_cosine_thresh: float = np.cos(np.radians(60)), +) -> Tuple[torch.Tensor, np.ndarray]: + if lethalmap is None: + lethalmap = np.zeros_like(heightmap, dtype=np.float32) + normal = generate_normal(heightmap) + costmap = generate_costmap_from_BEVmap(lethalmap, normal, costmap_cosine_thresh) + if blur_kernel != 0: + costmap = cv2.GaussianBlur(costmap, (blur_kernel, blur_kernel), 0) + costmap[costmap < 255.0] = 0.0 + bitmap = torch.ones( + (heightmap.shape[0], heightmap.shape[1], 2), dtype=torch.float32 + ) + bitmap[..., 0] = torch.from_numpy(costmap) + bitmap[..., 1] = torch.from_numpy(heightmap) + offset = map_res * np.array(bitmap.shape[:2]) * 0.5 + return bitmap, offset + + +def clip_goal_to_map( + bitmap: torch.Tensor, + map_res: float = 0.25, + start: Optional[np.ndarray] = None, + goal: Optional[np.ndarray] = None, + num_samples: int = 200, +) -> np.ndarray: + H, W = bitmap.shape[:2] + H *= map_res + W *= map_res + + x0, y0 = start[:2] + x1, y1 = goal[:2] + + # Generate t in [0, 1] + t_vals = torch.linspace(0, 1, num_samples, device=bitmap.device) + + # Line points: start + t * (goal - start) + dx = x1 - x0 + dy = y1 - y0 + xs = x0 + t_vals * dx + ys = y0 + t_vals * dy + + # Check which are inside map bounds + valid = (xs >= 0) & (xs < W) & (ys >= 0) & (ys < H) + if torch.any(valid): + last_valid_idx = torch.where(valid)[0][-1] + goal[0] = xs[last_valid_idx] + goal[1] = ys[last_valid_idx] + else: + # All points are out of bounds; snap to start + goal[0] = x0 + goal[1] = y0 + return goal + + +def obtain_state(odom: Odometry, state: np.ndarray) -> np.ndarray: + ## obtain the state from the odometry and imu messages: + quaternion = ( + odom.pose.pose.orientation.x, + odom.pose.pose.orientation.y, + odom.pose.pose.orientation.z, + odom.pose.pose.orientation.w, + ) + rpy = euler_from_quaternion(quaternion) + state[0] = odom.pose.pose.position.x + state[1] = odom.pose.pose.position.y + state[2] = odom.pose.pose.position.z + state[3] = rpy[0] + state[4] = rpy[1] + state[5] = rpy[2] + state[6] = odom.twist.twist.linear.x + state[7] = odom.twist.twist.linear.y + state[8] = odom.twist.twist.linear.z + return state + + +def publish_goal(goal: np.ndarray, marker_pub: Any) -> None: + marker_array = MarkerArray() + marker = Marker() + marker.header.frame_id = "odom" + marker.id = 0 + marker.type = marker.SPHERE + marker.action = marker.ADD + marker.scale.x = 4.0 + marker.scale.y = 4.0 + marker.scale.z = 0.1 + marker.color.a = 1.0 + marker.color.r = 1.0 + marker.color.g = 0.0 + marker.color.b = 0.0 + marker.pose.orientation.w = 1.0 + marker.pose.position.x = goal[0] + marker.pose.position.y = goal[1] + marker.pose.position.z = 1 + marker_array.markers.append(marker) + marker_pub.publish(marker_array) + + +def publish_path(path: np.ndarray, path_publisher: Any) -> None: + ros_path = Path() + ros_path.header.stamp = rospy.Time.now() + ros_path.header.frame_id = "map" # or whatever your fixed frame is + + for waypoint in path: + pose = PoseStamped() + pose.header.stamp = rospy.Time.now() + pose.header.frame_id = "map" + + pose.pose.position.x = waypoint[0] + pose.pose.position.y = waypoint[1] + pose.pose.position.z = 1 # would be nice to get the extended state here... + + # Convert theta to quaternion + quat = quaternion_from_euler(0, 0, waypoint[2]) + pose.pose.orientation = Quaternion(*quat) + + ros_path.poses.append(pose) + + path_publisher.publish(ros_path) + + +def diagnostic_publisher( + status: Any, + expansion_counter: int, + time_taken: float, + hysteresis: float, + diagnostics_pub: Any, +) -> None: + diagnostics_array = DiagnosticArray() + diagnostics_status = DiagnosticStatus() + diagnostics_status.name = "planner" + diagnostics_status.level = status + diagnostics_status.values.append( + KeyValue(key="Expansion Count", value=str(expansion_counter)) + ) + diagnostics_status.values.append( + KeyValue(key="Time Taken (s)", value=f"{time_taken:.4f}") + ) + diagnostics_status.values.append( + KeyValue(key="Expansions/second", value=f"{expansion_counter/time_taken:.4f}") + ) + diagnostics_status.values.append(KeyValue(key="Hysteresis", value=f"{hysteresis}")) + + diagnostics_array.status.append(diagnostics_status) + diagnostics_pub.publish(diagnostics_array) + + +def set_headings(local_waypoints: np.ndarray) -> np.ndarray: + target_Vhat = np.zeros_like(local_waypoints) # 0 out everything + for i in range(1, len(local_waypoints) - 1): # all points except first and last + V_prev = local_waypoints[i] - local_waypoints[i - 1] + V_next = local_waypoints[i + 1] - local_waypoints[i] + target_Vhat[i] = (V_next + V_prev) / np.linalg.norm(V_next + V_prev) + target_Vhat[0] = local_waypoints[1] - local_waypoints[0] + target_Vhat[0] /= np.linalg.norm(target_Vhat[0]) + target_Vhat[-1] = local_waypoints[-1] - local_waypoints[-2] + target_Vhat[-1] /= np.linalg.norm(target_Vhat[-1]) + + waypoints = np.zeros((len(local_waypoints), 4)) + waypoints[:, :2] = local_waypoints[:, :2] + waypoints[:, 2] = np.arctan2(target_Vhat[:, 1], target_Vhat[:, 0]) + waypoints[:, 3] = 2.0 + return waypoints + + +earthRadius = 6378145.0 +DEG_TO_RAD = 0.01745329252 +RAD_TO_DEG = 57.2957795131 + + +def calcposLLH(lat: float, lon: float, dX: float, dY: float) -> Tuple[float, float]: + lat += dY / (earthRadius * DEG_TO_RAD) + lon += dX / (earthRadius * np.cos(lat * DEG_TO_RAD) * DEG_TO_RAD) + return lat, lon + + +def calcposNED( + lat: float, lon: float, latReference: float, lonReference: float +) -> Tuple[float, float]: + Y = earthRadius * (lat - latReference) * DEG_TO_RAD + X = ( + earthRadius + * np.cos(latReference * DEG_TO_RAD) + * (lon - lonReference) + * DEG_TO_RAD + ) + return X, Y + + +def start_goal_logic( + bitmap: torch.Tensor, + map_res: float, + start_state: np.ndarray, + goal: np.ndarray, + map_center: np.ndarray, + offset: np.ndarray, + stop: bool = False, +) -> Tuple[torch.Tensor, torch.Tensor]: + start = np.zeros(4, dtype=np.float32) + goal_ = np.zeros(4, dtype=np.float32) + start[:2] = start_state[:2] + offset - map_center[:2] + goal_[:2] = goal[:2] + offset - map_center[:2] + goal_ = clip_goal_to_map(bitmap, map_res=map_res, start=start, goal=goal_) + start[2] = start_state[2] + start[3] = start_state[3] + goal_[2] = goal[2] + goal_[3] = 0.0 if stop else 2.0 + return torch.from_numpy(start).to(dtype=torch.float32), torch.from_numpy(goal_).to( + dtype=torch.float32 + ) + + +def get_local_frame_waypoints( + waypoint_list: Any, gps_origin: Tuple[float, float] +) -> np.ndarray: + local_waypoints = [] + for waypoint in waypoint_list: + if waypoint.frame != 3: + continue + lat = waypoint.x_lat + lon = waypoint.y_long + # generate X,Y locations using calcposNED and append to path + X, Y = calcposNED(lat, lon, gps_origin[0], gps_origin[1]) + local_waypoints.append(np.array([X, Y, 0])) + local_waypoints = set_headings(np.array(local_waypoints)) + return local_waypoints + + +def visualize_map_with_path( + costmap: np.ndarray, + elevation_map: np.ndarray, + path: Optional[np.ndarray], + goal: np.ndarray, + state: np.ndarray, + wp_radius: float, + map_center: np.ndarray, + map_size: int, + resolution_inv: float, +) -> np.ndarray: + """ + Visualize the costmap and elevation map with the path, goal, and optionally the current state. + Args: + costmap: 2D numpy array (uint8) + elevation_map: 2D numpy array (float) + path: Nx4 or Nx2 numpy array (path points) + goal: (2,) or (4,) array-like (goal position) + state: (N,) array-like, current state (x, y, theta, ...) + wp_radius: float, waypoint radius (for goal circle) + map_center: (2,) or (3,) array-like, map center offset + map_size: int, output image size in pixels + resolution_inv: float, 1.0 / map resolution + Returns: + costmap_img: RGB image (np.uint8) for display with OpenCV + """ + # Resize maps to output size + costmap = cv2.resize(costmap, (map_size, map_size)) + elevation_map = cv2.resize(elevation_map, (map_size, map_size)) + + # Colorize costmap: white for 255, pink for others + costmap_color = np.clip(costmap, 0, 255).astype(np.uint8) + pink = np.array([255, 105, 180], dtype=np.uint8) + white = np.array([255, 255, 255], dtype=np.uint8) + color_map = np.zeros((map_size, map_size, 3), dtype=np.uint8) + mask_white = costmap_color == 255 + mask_pink = ~mask_white + color_map[mask_white] = white + color_map[mask_pink] = pink + + # Colorize elevation map and blend with costmap (where costmap is white) + elev_norm = np.clip((elevation_map + 4) / 8, 0, 1) + elev_uint8 = (elev_norm * 255).astype(np.uint8) + elev_color = np.stack([elev_uint8] * 3, axis=-1) + display_img = color_map.copy() + display_img[mask_white] = elev_color[mask_white] + + # Draw goal (convert to local map frame) + goal_disp = np.array(goal[:2]) - np.array(map_center[:2]) + goal_x = int( + np.clip((goal_disp[0] * resolution_inv) + map_size // 2, 0, map_size - 1) + ) + goal_y = int( + np.clip((goal_disp[1] * resolution_inv) + map_size // 2, 0, map_size - 1) + ) + radius = max(2, int(wp_radius * resolution_inv)) + cv2.circle(display_img, (goal_x, goal_y), radius, (255, 255, 255), 2) + + # Draw path + if path is not None and len(path) > 0: + path_disp = np.copy(path) + path_disp[..., :2] -= np.array(map_center[:2]) + path_X = np.clip( + (path_disp[..., 0] * resolution_inv) + map_size // 2, 0, map_size - 1 + ).astype(int) + path_Y = np.clip( + (path_disp[..., 1] * resolution_inv) + map_size // 2, 0, map_size - 1 + ).astype(int) + car_width_px = max(1, int(0.15 * resolution_inv)) + if path.shape[1] > 3: + velocity = path[..., 3] + velocity_norm = (velocity - np.min(velocity)) / ( + np.max(velocity) - np.min(velocity) + 1e-6 + ) + velocity_color = np.clip((velocity_norm * 255), 0, 255).astype(int) + else: + velocity_color = np.full(len(path_X), 128, dtype=int) + for i in range(len(path_X) - 1): + cv2.line( + display_img, + (path_X[i], path_Y[i]), + (path_X[i + 1], path_Y[i + 1]), + (0, int(velocity_color[i]), 0), + car_width_px, + ) + + # Draw current state as a rectangle (if provided) + if state is not None and len(state) >= 3: + x = state[0] - map_center[0] + y = state[1] - map_center[1] + theta = np.pi - state[2] + x_px = int(x * resolution_inv + map_size // 2) + y_px = int(y * resolution_inv + map_size // 2) + car_width_px = max(2, int(0.29 * resolution_inv)) + car_height_px = max(2, int(0.15 * resolution_inv)) + half_width = car_width_px // 2 + half_height = car_height_px // 2 + corners = np.array( + [ + [x_px - half_width, y_px - half_height], + [x_px + half_width, y_px - half_height], + [x_px + half_width, y_px + half_height], + [x_px - half_width, y_px + half_height], + ], + dtype=np.int32, + ) + rotation_matrix = cv2.getRotationMatrix2D((x_px, y_px), np.degrees(theta), 1.0) + rotated_corners = cv2.transform(np.array([corners]), rotation_matrix)[0] + cv2.polylines( + display_img, [rotated_corners], isClosed=True, color=(0, 0, 0), thickness=2 + ) + + # Flip for visualization + display_img = cv2.flip(display_img, 0) + return display_img