Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pylint
# If you have requirements:
# pip install -r requirements.txt
pip install -r requirements.txt

- name: Run pylint
run: |
pylint $(git ls-files '*.py')
pylint --rcfile=.pylintrc $(find mavctl -name "*.py")
32 changes: 32 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[MASTER]
ignore=tests,bin

[MESSAGES CONTROL]
# Disable warnings that are common in pymavlink / high-level API
disable=
missing-module-docstring,
missing-function-docstring,
missing-class-docstring,
invalid-name,
broad-except,
attribute-defined-outside-init,
protected-access,
R0902

[TYPECHECK]
ignored-modules=
pymavlink
pymavlink.*

[BASIC]
good-names=i,j,k,
x,y,z,
lat,lon,alt,
msg,mav,seq

[DESIGN]
max-args=10
max-locals=20
max-branches=15
max-statements=60
min-public-methods=0
File renamed without changes.
7 changes: 7 additions & 0 deletions mavctl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .messages.navigator import Navigator

__all__ = [
"Navigator"
]

__version__ = "0.0.1"
18 changes: 18 additions & 0 deletions mavctl/_internal/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging

def get_logger(name: str, level=logging.DEBUG) -> logging.Logger:
"""Returns a configured logger instance."""
logger = logging.getLogger(name)
logger.setLevel(level)

if not logger.handlers:
ch = logging.StreamHandler()
ch.setLevel(level)
formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
ch.setFormatter(formatter)
logger.addHandler(ch)

return logger
81 changes: 81 additions & 0 deletions mavctl/_internal/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from math import radians, degrees, cos, sin, atan2, sqrt
from mavctl.messages.location import LocationLocal


def distance_to_target(target1, target2) -> LocationLocal:
"""
Returns the distance vector to the target.

Args:
target1: Current Position of Vehicle (LocationLocal)
target2: Target Position (LocationLocal)
"""
dx = target2.x - target1.north
dy = target2.y - target1.east
dz = target2.z - target1.down

return LocationLocal(dx, dy, dz)


def check_target_reached(target1, target2, tolerance: float = 0.5) -> bool:
"""
Checks if the vehicle has reached the target within a certain tolerance.

Args:
target1: Current Position of the vehicle (LocationLocal)
target2: Target Position (LocationLocal)
tolerance: Distance tolerance in meters
"""
delta = distance_to_target(target1, target2)
delta_length = sqrt(delta.north ** 2 + delta.east ** 2 + delta.down ** 2)

return delta_length < tolerance


def LatLon_to_Distance(point, origin) -> float:
"""
Computes the distance between two global coordinates using Haversine formula.

Args:
point: Target LocationGlobal
origin: Origin LocationGlobal

Returns:
Distance in meters
"""
R = 6378137 # Earth radius in meters

origin_lat_rad = radians(origin.lat)
origin_lon_rad = radians(origin.lon)
point_lat_rad = radians(point.lat)
point_lon_rad = radians(point.lon)

delta_lat = point_lat_rad - origin_lat_rad
delta_lon = point_lon_rad - origin_lon_rad

a = sin(delta_lat / 2) ** 2 + cos(origin_lat_rad) * cos(point_lat_rad) * sin(delta_lon / 2) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))

return R * c


def Heading(point1, point2) -> float:
"""
Computes the heading from point1 to point2 in degrees (0-360).

Args:
point1: Start LocationGlobal
point2: End LocationGlobal

Returns:
Heading in degrees
"""
lat1_rad = radians(point1.lat)
lat2_rad = radians(point2.lat)
delta_lon = radians(point2.lon - point1.lon)

x = sin(delta_lon) * cos(lat2_rad)
y = cos(lat1_rad) * sin(lat2_rad) - sin(lat1_rad) * cos(lat2_rad) * cos(delta_lon)

angle = atan2(x, y)
return (degrees(angle) + 360) % 360
124 changes: 64 additions & 60 deletions mavctl/connect/conn.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,94 @@

from pymavlink import mavutil
import time
import threading
from typing import Optional

from pymavlink import mavutil

from mavctl.connect.heartbeat import HeartbeatManager
from mavctl.messages.navigator import Navigator
from mavctl.exceptions import ConnError
from mavctl._internal.logger import get_logger

LOGGER = get_logger(__name__)

class Connect:

def __init__(self,
"""Handles MAVLink connections and heartbeat monitors"""

def __init__(self,
ip: str = "udp:127.0.0.1:14552",
baud: int = 57600,
heartbeat_timeout = None):
heartbeat_timeout: Optional[int] = None):

"""
Initialize a MAVLink connection and run the heartbeat manager on the connection

Args:
ip: Connection String
baud: Baud rate
heartbeat_timeout: Optional timeout in seconds for waiting for heartbeats
"""
self.master = self.connect(ip = ip, baud = baud, heartbeat_timeout = heartbeat_timeout)

self._heartbeat_manager = HeartbeatManager(self.master)
self._heartbeat_manager.start()


def send_heartbeat(self, interval: int = 0.25):
self.ip = ip

try:
while True:
self.master.mav.heartbeat_send(
type=mavutil.mavlink.MAV_TYPE_GCS,
autopilot=mavutil.mavlink.MAV_AUTOPILOT_INVALID,
base_mode=0,
custom_mode=0,
system_status=mavutil.mavlink.MAV_STATE_ACTIVE
)
time.sleep(interval)
def connect(self, ip: str = "udp:127.0.0.1:14552",
baud: int = 57600,
heartbeat_timeout: Optional[int] = None) -> mavutil.mavlink_connection:
"""
Function to connect to the MAVLink device

except Exception as e:
raise Exception("MAVCTL ERROR: Failed to send heartbeat!: ", e)

def recv_heartbeat(self, interval: int = 1):

try:
while True:
msg_recv = self.master.recv_match(type="HEARTBEAT", blocking=False)
Args:
ip: Connection String
baud: Baud rate
heartbeat_timeout: Optional timeout in seconds for waiting for heartbeats

# Create disconnect handling here...
time.sleep(interval)
Returns:
A mavutil.mavlink_connection instance

except Exception as e:
raise Exception("MAVCTL ERROR: Failed to send heartbeat!: ", e)

def heartbeat_start(self):
self.send_heartbeat_thread.start()
self.recv_heartbeat_thread.start()

def heartbeat_kill(self):
self._stop_event.set()
self.send_heartbeat_thread.join(timeout=2)
self.recv_heartbeat_thread.join(timeout=2)

def disconnect(self):
self.heartbeat_kill()
self.master.close()
print("MAVCTL: Disconnecting!")

# Function to connect to mavlink
def connect(self, ip: str = "udp:127.0.0.1:14552", # Default IP and port for the UAARG Autopilot Simulator
baud: int = 57600, # Default Baud Rate for the PixHawk, Add support for this later
#source_system = 255, # Add support for this later
#source_component = 0, # Add support for this later
heartbeat_timeout = None # Automatically assume that there is no timeout
):
Raises:
ConnError: If heartbeat or the connection fails
"""
try:
# NOTE: Using default parameters built into pymavlink, look into how this can be customized
master = mavutil.mavlink_connection(ip, baud)
msg_recv = master.recv_match(type="HEARTBEAT", blocking=False)
while not msg_recv:
i = 0
if heartbeat_timeout != None:
if heartbeat_timeout > 0:
if heartbeat_timeout is not None:
if heartbeat_timeout > 0:
i += 1
if heartbeat_timeout % 5:
print("MAVCTL: Waiting for heartbeat ...")
elif heartbeat_timeout == 0:
raise Exception("MAVCTL ERROR: Heartbeat not found!")
raise ConnError("MAVCTL ERROR: Heartbeat not found!")
print("MAVCTL: Waiting for heartbeat ...")
msg_recv = master.recv_match(type="HEARTBEAT", blocking=False)
time.sleep(1)
print("MAVCTL: Connected at: ", ip)
LOGGER.info("MAVCTL: Connected at %s", ip)
return master

except Exception as e:
raise ConnError("MAVCTL ERROR: Failed to receive heartbeat!: ", e) from e

def disconnect(self, timeout: float = 5, device: Optional[str] = None) -> bool:
"""Graceful Disconnect from the MAVLink Device

Returns:
Bool saying whether or not the disconnect was successful

Raises ConnErorr if disconnect fails"""

LOGGER.info("MAVCTL: Disconnecting ...")
try:

self._heartbeat_manager.stop()
start_time = time.monotonic()
while self._heartbeat_manager.get_connection_status():
if (time.monotonic() - start_time) > timeout:
LOGGER.warning("MAVCTL: Heartbeat manager did not stop within timeout")
return False
self.master.close()
LOGGER.info("MAVCTL: Successfully Disconnected from %s", device)
return True

except Exception as e:
raise Exception("MAVCTL ERROR: Failed to receive heartbeat!: ", e)
raise ConnError("MAVCTL ERROR: Disconnect failed", e) from e
4 changes: 2 additions & 2 deletions mavctl/connect/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ def _send_heartbeat(self):
base_mode=0,
custom_mode=0,
system_status=mavutil.mavlink.MAV_STATE_ACTIVE
)
time.sleep(self.heartbeat_timeout)
)
time.sleep(self.heartbeat_timeout)

def get_connection_status(self) -> bool:
"""Get the current connection status."""
Expand Down
9 changes: 9 additions & 0 deletions mavctl/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""
Used to create specific exceptions to help with debugging
"""

class MAVCTLError(Exception):
"""Base exception for all mavctl errors"""

class ConnError(MAVCTLError):
"""Exception to handle Connection Errors"""
26 changes: 0 additions & 26 deletions mavctl/messages/advanced.py

This file was deleted.

Loading