Skip to content

ROS example added #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ ighastar/scripts/test_case_generation.py
examples/ros_kinodynamic_example.py
examples/ROS/
examples/BeamNG/
ighastar/src/IGHAStarClass.py
ighastar/src/Nodes.py
# Python
__pycache__/
*.py[cod]
Expand Down
Binary file added Content/ROS/ros_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
39 changes: 39 additions & 0 deletions examples/ROS/Configs/ros_example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Planner_config:
experiment_info_default:
resolution: 2.0
epsilon: [2.0, 2.0, 1.57, 5.0] # ate, cte, heading, vel
tolerance: 0.125
max_level: 5
division_factor: 2 # Example division factor
max_expansions: 500
hysteresis: 100
node_info:
node_type: "kinodynamic"
length: 0.3
width: 0.25
map_res: 0.25
dt: 0.05
timesteps: 5
step_size: 1.0
steering_list: [-15.0, -5.0, 0.0, 5.0, 15.0] # in degrees.
throttle_list: [-4.0, -2.0, 0, 2.0, 4.0]
del_theta: 90.0 # in degrees
del_vel: 1.0
RI: 0.5
max_vert_acc: 5.0
min_vel: 0.0 # don't go in reverse.
max_vel: 2.0
max_theta: 30.0 # in degrees
gear_switch_time: 0.0
topics:
elevation_map: "/elevation_mapping/elevation_map_cropped_cv2"
odom: "/mavros/local_position/odom"
waypoints: "/mavros/mission/waypoints"
path: "/planner_test/path"
diagnostics: "/planner_test/diagnostic"
goal_marker: "/planner_test/goal_marker"
global_position: "/mavros/global_position/global"

blur_kernel: 0
lethal_slope: 60
wp_radius: 2.0
78 changes: 78 additions & 0 deletions examples/ROS/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# IGHAStar ROS Example

This folder contains a ROS-based example for the IGHA* path planner. The code here is really an example script to demonstrate how one would use the planner in the loop on their robot.
It demonstrates how to integrate the IGHA* planner with ROS topics for maps, odometry, waypoints, and visualization.
For simplicity (not needing to fiddle with rviz), there is a built in visualization in the example.
It is however not a "ros-package"; one can use the example shown here as a stencil to incorporate IGHA* into their navigation stack.


## Prerequisites
- ROS Noetic
- Python 3
- IGHAStar C++/CUDA requirements (met with pip install -e .)

## Setup (confirm these instructions)
1. **Install ROS dependencies:**
Assuming IGHA* has already been installed,
```bash
# Install ROS packages for message types
sudo apt-get update
sudo apt-get install ros-noetic-grid-map-msgs \
ros-noetic-mavros-msgs \
ros-noetic-nav-msgs \
ros-noetic-geometry-msgs \
ros-noetic-sensor-msgs \
ros-noetic-visualization-msgs \
ros-noetic-diagnostic-msgs \
ros-noetic-tf

# Check for any missing dependencies
rosdep install --from-paths src --ignore-src -r -y
```

2. **Verify installation:**
```bash
# Test that all message types are available
python3 -c "
import rospy
from nav_msgs.msg import Path, Odometry
from mavros_msgs.msg import WaypointList
from grid_map_msgs.msg import GridMap
from visualization_msgs.msg import MarkerArray
from sensor_msgs.msg import NavSatFix
from diagnostic_msgs.msg import DiagnosticArray
from geometry_msgs.msg import PoseStamped, Quaternion
print('All ROS message types imported successfully!')
"
```

## Configuration
- Edit `Configs/ros_example.yml` to set planner parameters, topic names, and costmap settings.

## Running the Example


<figure align="center">
<img src="../../Content/ROS/ros_example.png" alt="ROS Example Visualization" width="600"/>
<figcaption><b>Fig. 1:</b> Example ROS planner visualization. The window shows the costmap (pink/white), elevation map (grayscale), the planned path (green), the goal (white circle), and the robot state (black rectangle).</figcaption>
</figure>

We provide rosbags that you can use to run the example script. Place them in the `rosbags` folder. Download here: [rosbag 1](https://drive.google.com/file/d/1zV0f3NbPuyewwbHUlrcuboj-PMrPixwG/view?usp=sharing), [rosbag 2](https://drive.google.com/file/d/1BPsypv83_W5EtcodyV2W75BSK3rVE3mX/view?usp=sharing)

1. **Start ROS core:**
```bash
roscore
```
2. **Run the planner node:**
```bash
python3 examples/ROS/example.py
```
3. **Play a rosbag or run your robot simulation** to publish the required topics.
```
cd examples/ROS/rosbags/
rosbag play hound_95.bag
```

## Visualization
- The planner will open an OpenCV window showing the costmap, elevation map, path, goal, and robot state.
- Path and goal are also published as ROS topics for use in RViz.
237 changes: 237 additions & 0 deletions examples/ROS/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
ROS wrapper for IGHA* planner (single-class version).
- Can be used live (with ROS subscribers) or offline (by calling callbacks with rosbag messages).
- Maintains internal state and publishes planned paths.
- Clean, modern, and easy to adapt for both online and offline use.
"""
import rospy
import numpy as np
import torch
import yaml
from nav_msgs.msg import Path, Odometry
from mavros_msgs.msg import WaypointList
from grid_map_msgs.msg import GridMap
from visualization_msgs.msg import MarkerArray
from utils import *
import cv2
import time
import os
from ighastar.scripts.common_utils import create_planner
from typing import Any, Optional, Tuple


class PlannerNode:
"""
ROS node wrapper for IGHAStar planner.
Handles all ROS communication, map/state/waypoint management, and planning logic.
Can be used live or with rosbag replay by calling the callbacks directly.
"""

def __init__(self, config: dict) -> None:
# --- Planner config ---
planner_cfg = config["Planner_config"]
self.map_res = planner_cfg["experiment_info_default"]["node_info"]["map_res"]
self.expansion_limit = planner_cfg["experiment_info_default"]["max_expansions"]
self.hysteresis = planner_cfg["experiment_info_default"]["hysteresis"]
self.planner = create_planner(planner_cfg)
# --- Costmap parameters ---
self.blur_kernel = config["blur_kernel"]
self.costmap_cosine_thresh = np.cos(np.radians(config["lethal_slope"]))
self.wp_radius = config["wp_radius"]
# --- IO variables ---
self.state = np.zeros(9)
self.local_waypoints = None
self.global_pos = None
self.map_init = False
self.height_index = None
self.bitmap = None
self.offset = None
self.map_center = None
self.map_size_px = None
# --- ROS publishers ---
topics = config["topics"]
self.path_pub = rospy.Publisher(topics["path"], Path, queue_size=1, latch=True)
self.marker_pub = rospy.Publisher(
topics["goal_marker"], MarkerArray, queue_size=1
)
self.diagnostics_pub = rospy.Publisher(
topics["diagnostics"], DiagnosticArray, queue_size=1
)
# --- ROS subscribers ---
rospy.Subscriber(
topics["elevation_map"], GridMap, self.map_callback, queue_size=1
)
rospy.Subscriber(topics["odom"], Odometry, self.odom_callback, queue_size=1)
rospy.Subscriber(
topics["waypoints"], WaypointList, self.waypoint_callback, queue_size=1
)
rospy.Subscriber(
topics["global_position"], NavSatFix, self.global_pos_callback, queue_size=1
)
self.plan_loop()

def map_callback(self, grid_map: GridMap) -> None:
"""Update the map from the GridMap message."""
self.grid_map = grid_map
if self.height_index is None or self.layers is None:
self.layers = self.grid_map.layers
self.height_index = self.layers.index("elevation")
cent = self.grid_map.info.pose.position
self.map_center = np.array([cent.x, cent.y, cent.z])
matrix = self.grid_map.data[self.height_index]
self.heightmap = map_from_gridmap(matrix)
if np.any(np.isnan(self.heightmap)):
self.map_init = False
else:
# this function creates the bitmap (costmap, heightmap)
# and gets the offset of the map
self.bitmap, self.offset = process_grid_map(
self.heightmap,
lethalmap=None,
map_res=self.map_res,
blur_kernel=self.blur_kernel,
costmap_cosine_thresh=self.costmap_cosine_thresh,
)
self.map_init = True

def plan(
self, start_state: np.ndarray, goal_: np.ndarray, stop: bool = False
) -> Tuple[Optional[np.ndarray], bool, int, float, np.ndarray]:
"""Plan a path from the current state to the goal."""
if self.bitmap is None:
return None, False, 0, 0.0, goal_
bitmap = torch.clone(self.bitmap)
# this function effectively projects the start and goal to the
# bitmap if it is not already inside the bitmap bounds
start, goal = start_goal_logic(
bitmap,
self.map_res,
start_state,
goal_,
self.map_center,
self.offset,
stop=stop,
)
now = time.perf_counter()
# this function does the actual planning
# it takes in the start, goal, bitmap, expansion limit, hysteresis, and ignore goal validity
# it returns a boolean indicating if the goal was reached, the number of expansions, the time taken, and the output goal
# the output goal is the goal projected to the bitmap bounds
# Map convention: x is east, y is north. 0,0 is the bottom left corner of the map.
# if the map was shown using cv2.imshow, the origin would be the top left corner.
# Start and goal are relative to the bottom left corner of the map.
success = self.planner.search(
start, goal, bitmap, self.expansion_limit, self.hysteresis, True
)
time_taken = time.perf_counter() - now
# this function gets the profiler info from the planner
(
avg_successor_time,
avg_goal_check_time,
avg_overhead_time,
avg_g_update_time,
switches,
max_level_profile,
Q_v_size,
expansion_counter,
expansion_list,
) = self.planner.get_profiler_info()
output_goal = goal[:2] - (self.offset - self.map_center[:2])
if success:
path = self.planner.get_best_path().numpy()
path = np.flip(path, axis=0)
path[..., :2] -= self.offset
path[..., :2] += self.map_center[:2]
return path, True, expansion_counter, time_taken, output_goal
else:
return None, False, expansion_counter, time_taken, output_goal

def odom_callback(self, msg: Odometry) -> None:
"""Update local position from Odometry message."""
self.state = obtain_state(msg, self.state)

def global_pos_callback(self, msg: Any) -> None:
"""Update global position from NavSatFix message."""
self.global_pos = msg

def waypoint_callback(self, msg: WaypointList) -> None:
"""Update local waypoints from WaypointList message."""
print("Got waypoints")
self.waypoint_list = msg.waypoints
while self.global_pos is None or self.state is None:
time.sleep(1)
gps_origin = calcposLLH(
self.global_pos.latitude,
self.global_pos.longitude,
-self.state[0],
-self.state[1],
)
self.local_waypoints = get_local_frame_waypoints(self.waypoint_list, gps_origin)

def plan_loop(self) -> None:
"""Main loop for planning."""
rate = rospy.Rate(20)
while not rospy.is_shutdown():
rate.sleep()
if (
self.local_waypoints is not None
and self.state is not None
and self.bitmap is not None
and len(self.local_waypoints)
):
self.local_waypoints = np.array(self.local_waypoints)
plan_state = np.array(
[
self.state[0],
self.state[1],
self.state[5],
np.linalg.norm(self.state[6:8]),
]
)
goal = self.local_waypoints[0]
path, success, expansions, time_taken, output_goal = self.plan(
plan_state, goal, stop=len(self.local_waypoints) == 0
)
expansions_per_second = max(expansions / time_taken, 1000)
self.expansion_limit = int(expansions_per_second * 0.5)
if success:
publish_path(path, self.path_pub)
publish_goal(output_goal, self.marker_pub)
# Visualize map with path

diagnostic_publisher(
success,
expansions,
time_taken,
self.hysteresis,
self.diagnostics_pub,
)
display_map = visualize_map_with_path(
self.bitmap[..., 0].numpy(),
self.bitmap[..., 1].numpy(),
path,
output_goal,
plan_state,
self.wp_radius,
self.map_center,
480,
7.5 / self.map_res,
)
cv2.imshow("planner_vis", display_map)
cv2.waitKey(1)
# reduce length of local waypoints:
dist = np.linalg.norm(self.local_waypoints[0][:2] - plan_state[:2])
if dist < self.wp_radius:
if len(self.local_waypoints) > 1:
self.local_waypoints = self.local_waypoints[1:]


if __name__ == "__main__":
rospy.init_node("ighastar_planner_node")
config_name = "ros_example.yml"
config_path = os.path.join(os.path.dirname(__file__), "Configs", config_name)
with open(config_path) as f:
Config = yaml.safe_load(f)
node = PlannerNode(Config)
rospy.spin()
4 changes: 4 additions & 0 deletions examples/ROS/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
torch
opencv-python
numpy
pyyaml
Loading