diff --git a/docs/user/index.rst b/docs/user/index.rst index 5afaee3a6..580b9c612 100644 --- a/docs/user/index.rst +++ b/docs/user/index.rst @@ -24,6 +24,7 @@ RocketPy's User Guide :caption: Special Case Simulations Compare Flights Class + Parachute Triggers (Acceleration-Based) Deployable Payload Air Brakes Example ../notebooks/sensors.ipynb diff --git a/docs/user/parachute_triggers.rst b/docs/user/parachute_triggers.rst new file mode 100644 index 000000000..91563d10c --- /dev/null +++ b/docs/user/parachute_triggers.rst @@ -0,0 +1,329 @@ +Acceleration-Based Parachute Triggers +====================================== + +RocketPy supports advanced parachute deployment logic using acceleration data +from simulated IMU (Inertial Measurement Unit) sensors. This enables realistic +avionics algorithms that mimic real-world flight computers. + +Overview +-------- + +Traditional parachute triggers rely on altitude and velocity. Acceleration-based +triggers provide additional capabilities: + +- **Motor burnout detection**: Detect thrust termination via sudden deceleration +- **Apogee detection**: Use acceleration and velocity together for precise apogee +- **Freefall detection**: Identify ballistic coasting phases +- **Liftoff detection**: Confirm motor ignition via high acceleration + +These triggers can optionally include sensor noise to simulate realistic IMU +behavior, making simulations more representative of actual flight conditions. + +Built-in Triggers +----------------- + +RocketPy provides four built-in acceleration-based triggers: + +Motor Burnout Detection +~~~~~~~~~~~~~~~~~~~~~~~~ + +Detects when the motor stops producing thrust by monitoring sudden drops in +acceleration magnitude. + +.. code-block:: python + + from rocketpy import Parachute + + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger="burnout", # Built-in trigger + sampling_rate=100, + lag=1.5 + ) + +**Detection criteria:** +- Vertical acceleration < -8.0 m/s² (end of thrust phase), OR +- Total acceleration magnitude < 2.0 m/s² (coasting detected) +- Rocket must be above 5m altitude and ascending (prevents false triggers at launch) + +Apogee Detection +~~~~~~~~~~~~~~~~ + +Detects apogee using both near-zero vertical velocity and negative vertical +acceleration. + +.. code-block:: python + + main = Parachute( + name="Main", + cd_s=10.0, + trigger="apogee_acc", # Acceleration-based apogee + sampling_rate=100, + lag=0.5 + ) + +**Detection criteria:** +- Absolute vertical velocity < 1.0 m/s +- Vertical acceleration < -0.1 m/s² (downward) + +Freefall Detection +~~~~~~~~~~~~~~~~~~ + +Detects free-fall by monitoring very low total acceleration (near gravitational +acceleration only). + +.. code-block:: python + + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger="freefall", + sampling_rate=100, + lag=1.0 + ) + +**Detection criteria:** +- Total acceleration magnitude < 11.5 m/s² +- Rocket descending (vz < -0.2 m/s) +- Altitude > 5m (prevents ground-level false triggers) + +Liftoff Detection +~~~~~~~~~~~~~~~~~ + +Detects motor ignition via high total acceleration. + +.. code-block:: python + + test_parachute = Parachute( + name="Test", + cd_s=0.5, + trigger="liftoff", + sampling_rate=100, + lag=0.0 + ) + +**Detection criteria:** +- Total acceleration magnitude > 15.0 m/s² + +Custom Triggers +--------------- + +You can create custom triggers that use acceleration data: + +.. code-block:: python + + def custom_trigger(pressure, height, state_vector, u_dot): + """ + Custom trigger using acceleration data. + + Parameters + ---------- + pressure : float + Atmospheric pressure in Pa (with optional noise) + height : float + Height above ground level in meters (with optional noise) + state_vector : array + [x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz] + u_dot : array + Derivative: [vx, vy, vz, ax, ay, az, e0_dot, ...] + Accelerations are at indices [3:6] + + Returns + ------- + bool + True to trigger parachute deployment + """ + # Extract acceleration components (m/s²) + ax = u_dot[3] + ay = u_dot[4] + az = u_dot[5] + + # Calculate total acceleration magnitude + total_acc = (ax**2 + ay**2 + az**2)**0.5 + + # Custom logic: deploy if total acceleration < 5 m/s² while descending + vz = state_vector[5] + return total_acc < 5.0 and vz < -10.0 + + # Use custom trigger + parachute = Parachute( + name="Custom", + cd_s=2.0, + trigger=custom_trigger, + sampling_rate=100, + lag=1.0 + ) + +Sensor and Acceleration Triggers +--------------------------------- + +Triggers can also accept sensor data alongside acceleration: + +.. code-block:: python + + def advanced_trigger(pressure, height, state_vector, sensors, u_dot): + """ + Advanced trigger using both sensors and acceleration. + + Parameters + ---------- + sensors : list + List of sensor objects attached to the rocket + u_dot : array + State derivative including accelerations + + Returns + ------- + bool + """ + # Access sensor measurements + if len(sensors) > 0: + imu_reading = sensors[0].measurement + + # Use acceleration data + az = u_dot[5] + + # Combine sensor and acceleration logic + return az < -5.0 and imu_reading > threshold + + parachute = Parachute( + name="Advanced", + cd_s=1.5, + trigger=advanced_trigger, + sampling_rate=100 + ) + +Adding Acceleration Noise +-------------------------- + +To simulate realistic IMU behavior, you can add noise to acceleration data: + +.. code-block:: python + + from rocketpy import Flight + + flight = Flight( + rocket=my_rocket, + environment=env, + rail_length=5.2, + inclination=85, + heading=0, + acceleration_noise_function=lambda: np.random.normal(0, 0.5, 3) + ) + +The ``acceleration_noise_function`` returns a 3-element array ``[noise_x, noise_y, noise_z]`` +that is added to the acceleration components before passing to the trigger function. + +**Example with time-correlated noise:** + +.. code-block:: python + + class NoiseGenerator: + def __init__(self, stddev=0.5, correlation=0.9): + self.stddev = stddev + self.correlation = correlation + self.last_noise = np.zeros(3) + + def __call__(self): + # Time-correlated noise (AR(1) process) + white_noise = np.random.normal(0, self.stddev, 3) + self.last_noise = (self.correlation * self.last_noise + + np.sqrt(1 - self.correlation**2) * white_noise) + return self.last_noise + + flight = Flight( + rocket=my_rocket, + environment=env, + rail_length=5.2, + inclination=85, + heading=0, + acceleration_noise_function=NoiseGenerator(stddev=0.3, correlation=0.95) + ) + +Performance Considerations +-------------------------- + +Computing acceleration (``u_dot``) requires evaluating the equations of motion, +which adds computational cost. RocketPy optimizes this by: + +1. **Lazy evaluation**: ``u_dot`` is only computed if the trigger actually needs it +2. **Metadata detection**: The wrapper inspects trigger signatures to determine requirements +3. **Caching**: Derivative evaluations are reused when possible + +**Trigger signature detection:** + +- 3 parameters ``(p, h, y)``: Legacy trigger, no ``u_dot`` computed +- 4 parameters with ``u_dot``: Only acceleration computed +- 4 parameters with ``sensors``: Only sensors passed +- 5 parameters: Both sensors and acceleration provided + +Best Practices +-------------- + +1. **Choose appropriate sampling rates**: 50-200 Hz is typical for flight computers +2. **Add realistic noise**: Real IMUs have noise; simulate it for validation +3. **Test edge cases**: Verify triggers work at low altitudes, high speeds, etc. +4. **Use built-in triggers**: They handle edge cases (NaN, Inf) automatically +5. **Document custom triggers**: Include detection criteria in docstrings + +Example: Complete Dual-Deploy System +------------------------------------- + +.. code-block:: python + + from rocketpy import Rocket, Parachute, Flight, Environment + import numpy as np + + # Environment and rocket setup + env = Environment(latitude=32.99, longitude=-106.97, elevation=1400) + env.set_atmospheric_model(type="standard_atmosphere") + + my_rocket = Rocket(...) # Define your rocket + + # Drogue parachute: Deploy at motor burnout + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger="burnout", + sampling_rate=100, + lag=1.5, + noise=(0, 8.3, 0.5) # Pressure sensor noise + ) + my_rocket.add_parachute(drogue) + + # Main parachute: Deploy at 800m using custom trigger + def main_deploy_trigger(pressure, height, state_vector, u_dot): + """Deploy main at 800m while descending with positive vertical acceleration.""" + vz = state_vector[5] + az = u_dot[5] + return height < 800 and vz < -5 and az > -15 + + main = Parachute( + name="Main", + cd_s=10.0, + trigger=main_deploy_trigger, + sampling_rate=100, + lag=0.5, + noise=(0, 8.3, 0.5) + ) + my_rocket.add_parachute(main) + + # Flight with IMU noise simulation + flight = Flight( + rocket=my_rocket, + environment=env, + rail_length=5.2, + inclination=85, + heading=0, + acceleration_noise_function=lambda: np.random.normal(0, 0.3, 3) + ) + + flight.all_info() + +See Also +-------- + +- :doc:`Parachute Class Reference ` +- :doc:`Flight Simulation ` +- :doc:`Sensors and Controllers ` diff --git a/rocketpy/rocket/parachute.py b/rocketpy/rocket/parachute.py index f0bf86f67..1fbc2b973 100644 --- a/rocketpy/rocket/parachute.py +++ b/rocketpy/rocket/parachute.py @@ -8,6 +8,146 @@ from ..prints.parachute_prints import _ParachutePrints +def detect_motor_burnout(_pressure, height, state_vector, u_dot): + """Detect motor burnout by sudden drop in acceleration. + + Returns True when vertical acceleration becomes significantly negative + (indicating end of propulsion phase) OR when total acceleration drops below + a threshold indicating coasting/free-fall has begun. + """ + try: + if u_dot is None or len(u_dot) < 6: + return False + + ax = float(u_dot[3]) + ay = float(u_dot[4]) + az = float(u_dot[5]) + + # Defensive checks for NaN/Inf + if not all(np.isfinite([ax, ay, az])): + return False + + total_acc = np.sqrt(ax * ax + ay * ay + az * az) + if not np.isfinite(total_acc): + return False + + # Additional safety: ignore spurious low-accel readings at t~0 by + # requiring the rocket to be above a small altitude and still ascending + vz = float(state_vector[5]) if len(state_vector) > 5 else 0 + if not np.isfinite(vz): + return False + + if height < 5.0 or vz <= 0.5: + return False + + # Burnout detected when: + # 1. Vertical acceleration becomes very negative (end of thrust phase) + # 2. OR total acceleration drops below 2.0 m/s² (coasting detected) + return az < -8.0 or total_acc < 2.0 + except (ValueError, TypeError, IndexError): + return False + + +def detect_apogee_acceleration(_pressure, _height, state_vector, u_dot): + """Detect apogee using near-zero vertical velocity and negative vertical accel. + + Apogee occurs when the rocket reaches its highest point, characterized by + vertical velocity approaching zero and negative (downward) acceleration. + """ + try: + if state_vector is None or u_dot is None: + return False + if len(state_vector) < 6 or len(u_dot) < 6: + return False + + vz = float(state_vector[5]) + az = float(u_dot[5]) + if not all(np.isfinite([vz, az])): + return False + + # Slightly more permissive thresholds to avoid spurious misses + return abs(vz) < 1.0 and az < -0.1 + except (ValueError, TypeError, IndexError): + return False + + +def detect_freefall(_pressure, height, state_vector, u_dot): + """Detect free-fall when total acceleration magnitude is low. + + Free-fall is characterized by acceleration magnitude close to gravitational + acceleration (approximately -g in the vertical direction), or when the rocket + is in a ballistic coasting phase with minimal thrust or drag effects. + """ + try: + if u_dot is None or len(u_dot) < 6: + return False + + ax = float(u_dot[3]) + ay = float(u_dot[4]) + az = float(u_dot[5]) + if not all(np.isfinite([ax, ay, az])): + return False + + total_acc = np.sqrt(ax * ax + ay * ay + az * az) + if not np.isfinite(total_acc): + return False + + # Require the rocket to be descending and above a small altitude to + # avoid false positives before launch. + vz = float(state_vector[5]) if len(state_vector) > 5 else 0 + if not np.isfinite(vz): + return False + + if height < 5.0 or vz >= -0.2: + return False + + # More sensitive threshold: detect free-fall at lower acceleration + return total_acc < 11.5 + except (ValueError, TypeError, IndexError): + return False + + +def detect_liftoff(_pressure, _height, _state_vector, u_dot): + """Detect liftoff by high total acceleration. + + Liftoff is characterized by a sudden increase in acceleration as the motor + ignites and begins producing thrust. + """ + try: + if u_dot is None or len(u_dot) < 6: + return False + + ax = float(u_dot[3]) + ay = float(u_dot[4]) + az = float(u_dot[5]) + if not all(np.isfinite([ax, ay, az])): + return False + + total_acc = np.sqrt(ax * ax + ay * ay + az * az) + if not np.isfinite(total_acc): + return False + + return total_acc > 15.0 + except (ValueError, TypeError, IndexError): + return False + + +def altitude_trigger_factory(target_altitude, require_descent=True): + """Return a trigger that deploys when altitude <= target_altitude. + + If require_descent is True, also require vertical velocity negative + (descending) to avoid firing during ascent. + """ + + def trigger(_pressure, height, state_vector, _u_dot=None): + vz = float(state_vector[5]) + if require_descent: + return (height <= target_altitude) and (vz < 0) + return height <= target_altitude + + return trigger + + class Parachute: """Keeps information of the parachute, which is modeled as a hemispheroid. @@ -24,41 +164,56 @@ class Parachute: This parameter defines the trigger condition for the parachute ejection system. It can be one of the following: - - A callable function that takes three arguments: - 1. Freestream pressure in pascals. - 2. Height in meters above ground level. - 3. The state vector of the simulation, which is defined as: + - A callable function that can take 3, 4, or 5 arguments: + + **3 arguments** (legacy): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector: ``[x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz]`` - `[x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz]`. + **4 arguments** (sensors OR acceleration): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector. + 4. Either: + - ``sensors``: List of sensor objects attached to the rocket, OR + - ``u_dot``: State derivative including accelerations at indices [3:6] - 4. A list of sensors that are attached to the rocket. The most recent - measurements of the sensors are provided with the - ``sensor.measurement`` attribute. The sensors are listed in the same - order as they are added to the rocket. + **5 arguments** (sensors AND acceleration): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector. + 4. ``sensors``: List of sensor objects. + 5. ``u_dot``: State derivative with accelerations ``[vx, vy, vz, ax, ay, az, ...]`` - The function should return ``True`` if the parachute ejection system - should be triggered and False otherwise. The function will be called - according to the specified sampling rate. + The function should return ``True`` to trigger deployment, ``False`` otherwise. + The function will be called according to the specified sampling rate. - A float value, representing an absolute height in meters. In this case, the parachute will be ejected when the rocket reaches this height - above ground level. + above ground level while descending. - - The string "apogee" which triggers the parachute at apogee, i.e., - when the rocket reaches its highest point and starts descending. + - A string for built-in triggers: + - ``"apogee"``: Legacy apogee detection (velocity-based) + - ``"apogee_acc"``: Apogee detection using acceleration data + - ``"burnout"``: Motor burnout detection via acceleration drop + - ``"freefall"``: Free-fall detection via low total acceleration + - ``"liftoff"``: Liftoff detection via high acceleration Parachute.triggerfunc : function Trigger function created from the trigger used to evaluate the trigger condition for the parachute ejection system. It is a callable function - that takes three arguments: Freestream pressure in Pa, Height above - ground level in meters, and the state vector of the simulation. The - returns ``True`` if the parachute ejection system should be triggered + that takes five arguments: Freestream pressure in Pa, Height above + ground level in meters, the state vector, sensors list, and u_dot. + Returns ``True`` if the parachute ejection system should be triggered and ``False`` otherwise. - .. note: + .. note:: The function will be called according to the sampling rate specified. + For performance, ``u_dot`` is only computed if the trigger signature + indicates it is needed. Parachute.sampling_rate : float Sampling rate, in Hz, for the trigger function. @@ -219,50 +374,113 @@ def __init__( self.__evaluate_trigger_function(trigger) - def __evaluate_trigger_function(self, trigger): + def __evaluate_trigger_function(self, trigger): # pylint: disable=too-many-statements """This is used to set the triggerfunc attribute that will be used to interact with the Flight class. + + Notes + ----- + The resulting triggerfunc always has signature (p, h, y, sensors, u_dot) + so Flight can pass both sensors and u_dot when needed. """ # pylint: disable=unused-argument, function-redefined - # The parachute is deployed by a custom function - if callable(trigger): - # work around for having added sensors to parachute triggers - # to avoid breaking changes - triggerfunc = trigger - sig = signature(triggerfunc) - if len(sig.parameters) == 3: - def triggerfunc(p, h, y, sensors): - return trigger(p, h, y) + # Helper to wrap any callable to the internal (p, h, y, sensors, u_dot) API + def _make_wrapper(fn): + sig = signature(fn) + params = list(sig.parameters.keys()) - self.triggerfunc = triggerfunc + # detect if user function expects acceleration-like argument + expects_udot = any( + name.lower() in ("u_dot", "udot", "acc", "acceleration") + for name in params[3:] + ) + # detect if user function expects sensors argument + expects_sensors = any(name.lower() == "sensors" for name in params[3:]) + + def wrapper(p, h, y, sensors, u_dot): + # Support 3, 4, and 5-arg user functions + num_params = len(sig.parameters) + if num_params == 3: + return fn(p, h, y) + if num_params == 4: + # Check which 4th arg to pass + fourth_param = params[3].lower() + if fourth_param in ("u_dot", "udot", "acc", "acceleration"): + return fn(p, h, y, u_dot) + else: + return fn(p, h, y, sensors) + if num_params >= 5: + # Pass both sensors and u_dot + return fn(p, h, y, sensors, u_dot) + # fallback: try calling with available args + try: + return fn(p, h, y, sensors, u_dot) + except TypeError: + try: + return fn(p, h, y, u_dot) + except TypeError: + try: + return fn(p, h, y, sensors) + except TypeError: + return fn(p, h, y) + + # attach metadata so Flight can decide whether to compute u_dot + wrapper._expects_udot = expects_udot + wrapper._expects_sensors = expects_sensors + wrapper._wrapped_fn = fn + return wrapper + + # Callable provided by user + if callable(trigger): + self.triggerfunc = _make_wrapper(trigger) + return + + # Numeric altitude trigger + if isinstance(trigger, (int, float)): - elif isinstance(trigger, (int, float)): - # The parachute is deployed at a given height - def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument + def triggerfunc(p, h, y, sensors, u_dot): # pylint: disable=unused-argument # p = pressure considering parachute noise signal # h = height above ground level considering parachute noise signal # y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3] return y[5] < 0 and h < trigger + triggerfunc._expects_udot = False + triggerfunc._expects_sensors = True self.triggerfunc = triggerfunc - - elif trigger.lower() == "apogee": - # The parachute is deployed at apogee - def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument - # p = pressure considering parachute noise signal - # h = height above ground level considering parachute noise signal - # y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3] + return + + # String: map to built-in triggers + if isinstance(trigger, str): + key = trigger.strip().lower() + mapping = { + "apogee_acc": detect_apogee_acceleration, + "burnout": detect_motor_burnout, + "freefall": detect_freefall, + "liftoff": detect_liftoff, + } + if key in mapping: + self.triggerfunc = _make_wrapper(mapping[key]) + return + + # Special case: "apogee" (legacy support) + if isinstance(trigger, str) and trigger.lower() == "apogee": + + def triggerfunc(p, h, y, sensors, u_dot): # pylint: disable=unused-argument return y[5] < 0 + triggerfunc._expects_udot = False + triggerfunc._expects_sensors = True self.triggerfunc = triggerfunc - - else: - raise ValueError( - f"Unable to set the trigger function for parachute '{self.name}'. " - + "Trigger must be a callable, a float value or the string 'apogee'. " - + "See the Parachute class documentation for more information." - ) + return + + # If we reach this point, the trigger is invalid + raise ValueError( + f"Unable to set the trigger function for parachute '{self.name}'. " + + "Trigger must be a callable, a float value or one of the strings " + + "('apogee','burnout','freefall','liftoff'). " + + "See the Parachute class documentation for more information." + ) def __str__(self): """Returns a string representation of the Parachute class. diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index a7e0374b2..ff20e2d20 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -1,4 +1,5 @@ # pylint: disable=too-many-lines +import inspect import math import warnings from copy import deepcopy @@ -96,6 +97,12 @@ class Flight: trigger function evaluation points and then interpolation is used to calculate them and feed the triggers. Can greatly improve run time in some cases. + Note + ---- + Calculating the derivative `u_dot` inside parachute trigger checks will + cause additional calls to the equations of motion (extra physics + evaluations). This increases CPU cost but enables realistic avionics + algorithms that rely on accelerometer data. Flight.terminate_on_apogee : bool Whether to terminate simulation when rocket reaches apogee. Flight.solver : scipy.integrate.LSODA @@ -503,6 +510,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-statements name="Flight", equations_of_motion="standard", ode_solver="LSODA", + acceleration_noise_function=None, simulation_mode="6 DOF", weathercock_coeff=0.0, ): @@ -627,6 +635,13 @@ def __init__( # pylint: disable=too-many-arguments,too-many-statements self.equations_of_motion = equations_of_motion self.simulation_mode = simulation_mode self.ode_solver = ode_solver + # Function that returns accelerometer noise vector [ax_noise, ay_noise, az_noise] + # It should be a callable that returns an array-like of length 3. + self.acceleration_noise_function = ( + acceleration_noise_function + if acceleration_noise_function is not None + else (lambda: np.zeros(3)) + ) self.weathercock_coeff = weathercock_coeff # Controller initialization @@ -726,11 +741,14 @@ def __simulate(self, verbose): ) = self.__calculate_and_save_pressure_signals( parachute, node.t, self.y_sol[2] ) - if parachute.triggerfunc( + if self._evaluate_parachute_trigger( + parachute, noisy_pressure, height_above_ground_level, self.y_sol, self.sensors, + phase.derivative, + self.t, ): # Remove parachute from flight parachutes self.parachutes.remove(parachute) @@ -946,6 +964,7 @@ def __check_and_handle_parachute_triggers( height_above_ground_level, self.y_sol, self.sensors, + None, # u_dot not computed in non-overshoot path ): continue # Check next parachute @@ -1355,6 +1374,7 @@ def __check_overshootable_parachute_triggers( height_above_ground_level, overshootable_node.y_sol, self.sensors, + None, # u_dot not computed in overshoot path ): continue # Check next parachute @@ -1454,6 +1474,80 @@ def __calculate_and_save_pressure_signals(self, parachute, t, z): return noisy_pressure, height_above_ground_level + def _evaluate_parachute_trigger( + self, parachute, pressure, height, y, sensors, derivative_func, t + ): + """Evaluate parachute trigger, passing both sensors and u_dot to wrapper. + + This helper preserves backward compatibility with existing trigger + signatures. The wrapper in Parachute always expects (p, h, y, sensors, u_dot) + and Flight computes u_dot only when the trigger requests it (optimization). + + Parameters + ---------- + parachute : Parachute + Parachute object. + pressure : float + Noisy pressure value passed to trigger. + height : float + Height above ground level passed to trigger. + y : array + State vector at evaluation time. + sensors : list + Sensors list passed to trigger. + derivative_func : callable + Function to compute derivatives: derivative_func(t, y) + t : float + Time at which to evaluate derivatives. + + Returns + ------- + bool + True if trigger condition met, False otherwise. + """ + triggerfunc = parachute.triggerfunc + + # Check wrapper metadata for expectations + expects_udot = getattr(triggerfunc, "_expects_udot", False) + expects_sensors = getattr(triggerfunc, "_expects_sensors", True) + + # Fallback: inspect original trigger signature if metadata missing + if not expects_udot and not expects_sensors: + trig_original = getattr(parachute, "trigger", None) + if callable(trig_original): + try: + sig = inspect.signature(trig_original) + params = list(sig.parameters.keys()) + acc_names = {"u_dot", "udot", "acc", "acceleration"} + expects_udot = any(p.lower() in acc_names for p in params[3:]) + expects_sensors = any(p.lower() == "sensors" for p in params[3:]) + except (ValueError, TypeError): + expects_udot = False + expects_sensors = True + + # Compute u_dot only if needed (performance optimization) + u_dot = None + if expects_udot: + try: + u_dot = np.array(derivative_func(t, y), dtype=float) + # Inject accelerometer noise if configured + if hasattr(self, "acceleration_noise_function"): + try: + noise = np.asarray(self.acceleration_noise_function()) + if noise.size == 3: + # u_dot layout: [vx, vy, vz, ax, ay, az, ...] + u_dot[3:6] = u_dot[3:6] + noise + except (ValueError, TypeError): + # ignore noise errors and continue + pass + except (ValueError, TypeError, RuntimeError): + # If u_dot computation fails, leave as None + u_dot = None + + # Call the wrapper with both sensors and u_dot + # The wrapper will decide which args to pass to the user's function + return triggerfunc(pressure, height, y, sensors, u_dot) + def __init_solution_monitors(self): # Initialize solution monitors self.out_of_rail_time = 0 diff --git a/tests/unit/test_parachute_trigger_acceleration.py b/tests/unit/test_parachute_trigger_acceleration.py new file mode 100644 index 000000000..5301bc451 --- /dev/null +++ b/tests/unit/test_parachute_trigger_acceleration.py @@ -0,0 +1,482 @@ +"""Unit tests for acceleration-based parachute triggers. + +Tests cover trigger function signatures, noise injection, built-in triggers, +and edge cases for realistic avionics simulation. +""" + +import numpy as np +import pytest + +from rocketpy.rocket.parachute import ( + Parachute, + detect_apogee_acceleration, + detect_freefall, + detect_liftoff, + detect_motor_burnout, +) +from rocketpy.simulation.flight import Flight + + +class TestTriggerSignatures: + """Test various trigger function signatures and detection.""" + + def test_trigger_receives_u_dot_and_noise(self): + """Test that triggers receive u_dot with injected noise. + + Verifies acceleration data is computed and noise is injected before + passing to user trigger functions. + """ + + # Arrange + def derivative_func(_t, _y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return True + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.1, -0.2, 0.3]) + + # Act + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + # Assert + assert res is True + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3])) + + def test_trigger_with_sensors_and_u_dot(self): + """Test trigger with both sensors and u_dot parameters. + + Verifies wrapper correctly passes both sensors and acceleration data + to triggers expecting 5 parameters. + """ + + # Arrange + def derivative_func(_t, _y): + return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, sensors, u_dot): + recorded["sensors"] = sensors + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test2", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.0, 0.0, 0.0]) + + # Act + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=5.0, + y=np.zeros(13), + sensors=["s1"], + derivative_func=derivative_func, + t=1.234, + ) + + # Assert + assert res is False + assert recorded["sensors"] == ["s1"] + assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) + + def test_legacy_trigger_does_not_compute_u_dot(self): + """Test that legacy 3-parameter triggers don't trigger u_dot computation. + + Ensures backward compatibility by skipping expensive derivative + computation when not needed. + """ + + # Arrange + def derivative_func(_t, _y): + raise RuntimeError("derivative should not be called for legacy triggers") + + called = {} + + def legacy_trigger(_p, _h, _y): + called["ok"] = True + return True + + parachute = Parachute( + name="legacy", + cd_s=1.0, + trigger=legacy_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.zeros(3) + + # Act & Assert + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=0.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert called.get("ok", False) is True + + +class TestBuiltInTriggers: + """Test built-in acceleration-based trigger functions.""" + + def test_detect_apogee_acceleration_at_peak(self): + """Test apogee detection when velocity is near zero with negative acceleration. + + Apogee detection requires vertical velocity close to zero and + downward acceleration (negative az). + """ + # Arrange + state_vector = [0, 0, 1000, 0, 0, 0.3, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -1.0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_apogee_acceleration(101325, 1000, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_apogee_acceleration_ascending(self): + """Test apogee detection returns False while still ascending. + + Rocket must have near-zero vertical velocity to trigger apogee. + """ + # Arrange + state_vector = [0, 0, 500, 0, 0, 50, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -9.8, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_apogee_acceleration(101325, 500, state_vector, u_dot) + + # Assert + assert bool(result) is False + + def test_detect_motor_burnout_high_deceleration(self): + """Test burnout detection with sudden high negative acceleration. + + Motor burnout is indicated by total acceleration magnitude dropping + significantly. + """ + # Arrange + state_vector = [0, 0, 200, 0, 0, 100, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -12.0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_motor_burnout(101325, 200, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_motor_burnout_low_total_acceleration(self): + """Test burnout detection when total acceleration falls below threshold. + + Low total acceleration (< 1.5 m/s²) with descending rocket indicates + motor burnout. + """ + # Arrange + state_vector = [0, 0, 200, 0, 0, 100, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0.5, 0.5, 0.5, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_motor_burnout(101325, 200, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_freefall_near_zero_acceleration(self): + """Test freefall detection with near-zero total acceleration. + + Freefall occurs when total acceleration is very small (sensor noise level). + """ + # Arrange + state_vector = [0, 0, 500, 0, 0, -20, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0.1, 0.1, 0.1, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_freefall(101325, 500, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_liftoff_high_acceleration(self): + """Test liftoff detection with high upward acceleration. + + Liftoff requires total acceleration > 15 m/s² and low altitude. + """ + # Arrange + state_vector = [0, 0, 2, 0, 0, 5, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, 20.0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_liftoff(101325, 2, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_liftoff_on_pad(self): + """Test liftoff detection returns False while on launch pad. + + No liftoff until acceleration threshold is exceeded. + """ + # Arrange + state_vector = [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_liftoff(101325, 0, state_vector, u_dot) + + # Assert + assert bool(result) is False + + +class TestParachuteInitialization: + """Test parachute creation with different trigger types.""" + + def test_parachute_string_trigger_apogee_acc(self): + """Test parachute initialization with 'apogee' string trigger.""" + # Arrange & Act + parachute = Parachute( + name="Main", + cd_s=1.5, + trigger="apogee_acc", + sampling_rate=105, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert bool(parachute.triggerfunc._expects_udot) is True + + def test_parachute_string_trigger_burnout(self): + """Test parachute initialization with 'burnout' string trigger.""" + # Arrange & Act + parachute = Parachute( + name="Drogue", + cd_s=0.5, + trigger="burnout", + sampling_rate=100, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert bool(parachute.triggerfunc._expects_udot) is True + + def test_parachute_numeric_altitude_trigger(self): + """Test parachute with numeric altitude-based trigger. + + Altitude triggers do not require u_dot computation for performance. + """ + # Arrange & Act + parachute = Parachute( + name="Main", + cd_s=1.5, + trigger=500, + sampling_rate=105, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert bool(parachute.triggerfunc._expects_udot) is False + + def test_parachute_callable_trigger_5_args(self): + """Test parachute with custom trigger expecting sensors and u_dot.""" + + # Arrange + def custom_trigger(_p, _h, _y, sensors, u_dot): + return len(sensors) > 0 and u_dot[5] < -5.0 + + # Act + parachute = Parachute( + name="Custom", + cd_s=1.0, + trigger=custom_trigger, + sampling_rate=100, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert hasattr(parachute.triggerfunc, "_expects_sensors") + + def test_parachute_invalid_string_trigger(self): + """Test parachute initialization raises ValueError for invalid trigger string.""" + # Arrange & Act & Assert + with pytest.raises(ValueError, match="Unable to set the trigger function"): + Parachute( + name="Invalid", + cd_s=1.0, + trigger="invalid_trigger_name", + sampling_rate=100, + ) + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_trigger_with_nan_values(self): + """Test that triggers handle NaN values gracefully. + + Triggers should return False (safe default) when data is invalid. + """ + # Arrange + state_vector = [0, 0, 500, 0, 0, float("nan"), 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, float("nan"), 0, 0, 0, 0, 0, 0, 0] + + # Act + result_apogee = detect_apogee_acceleration(101325, 500, state_vector, u_dot) + result_burnout = detect_motor_burnout(101325, 500, state_vector, u_dot) + result_freefall = detect_freefall(101325, 500, state_vector, u_dot) + result_liftoff = detect_liftoff(101325, 500, state_vector, u_dot) + + # Assert + assert bool(result_apogee) is False + assert bool(result_burnout) is False + assert bool(result_freefall) is False + assert bool(result_liftoff) is False + + def test_trigger_with_inf_values(self): + """Test that triggers handle infinity values safely.""" + # Arrange + state_vector = [0, 0, 500, 0, 0, float("inf"), 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, float("inf"), 0, 0, 0, 0, 0, 0, 0] + + # Act & Assert - should not raise, just return False + result_apogee = detect_apogee_acceleration(101325, 500, state_vector, u_dot) + result_freefall = detect_freefall(101325, 500, state_vector, u_dot) + + assert bool(result_apogee) is False + assert bool(result_freefall) is False + + def test_trigger_at_very_low_altitude(self): + """Test triggers near ground level (edge case). + + Some triggers have altitude thresholds to prevent false positives. + """ + # Arrange + state_vector = [0, 0, 1, 0, 0, -5, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -0.5, 0, 0, 0, 0, 0, 0, 0] + + # Act + result_burnout = detect_motor_burnout(101325, 1, state_vector, u_dot) + result_freefall = detect_freefall(101325, 1, state_vector, u_dot) + + # Assert - should not trigger at very low altitude + assert bool(result_burnout) is False + assert bool(result_freefall) is False + + def test_trigger_with_no_noise_function(self): + """Test trigger evaluation when acceleration_noise_function is None.""" + + # Arrange + def derivative_func(_t, _y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = None # No noise + + # Act + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + # Assert + assert res is False + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([1.0, 2.0, 3.0])) + + def test_trigger_derivative_computation_error(self): + """Test trigger evaluation handles derivative computation errors gracefully.""" + + # Arrange + def derivative_func(_t, _y): + raise ValueError("Derivative computation failed") + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["called"] = True + recorded["u_dot"] = u_dot + return False + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = None + + # Act - should not raise, fallback gracefully + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + # Assert - function should still be called with None u_dot (fallback) + assert res is False + assert recorded.get("called", False) is True diff --git a/tests/unit/test_parachute_triggers.py b/tests/unit/test_parachute_triggers.py new file mode 100644 index 000000000..bd7dbbf2f --- /dev/null +++ b/tests/unit/test_parachute_triggers.py @@ -0,0 +1,141 @@ +import traceback + +import numpy as np + +from rocketpy.simulation.flight import Flight +from rocketpy.rocket.parachute import Parachute + + +def _test_trigger_receives_u_dot_and_noise(): + def derivative_func(_t, _y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return True + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.1, -0.2, 0.3]) + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3])) + + +def _test_trigger_with_u_dot_only(): + """Test trigger that only expects u_dot (no sensors).""" + + def derivative_func(_t, _y): + return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test_u_dot_only", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.0, 0.0, 0.0]) + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=5.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=1.234, + ) + + assert res is False + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) + + +def _test_legacy_trigger_does_not_compute_u_dot(): + def derivative_func(_t, _y): + raise RuntimeError("derivative should not be called for legacy triggers") + + called = {} + + def legacy_trigger(_p, _h, _y): + called["ok"] = True + return True + + parachute = Parachute( + name="legacy", + cd_s=1.0, + trigger=legacy_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.zeros(3) + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=0.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert called.get("ok", False) is True + + +def run_all(): + tests = [ + _test_trigger_receives_u_dot_and_noise, + _test_trigger_with_u_dot_only, + _test_legacy_trigger_does_not_compute_u_dot, + ] + failures = 0 + for t in tests: + name = t.__name__ + try: + t() + print(f"[PASS] {name}") + except Exception: # pylint: disable=broad-exception-caught + failures += 1 + print(f"[FAIL] {name}") + traceback.print_exc() + if failures: + print(f"{failures} test(s) failed") + raise SystemExit(1) + print("All tests passed") + + +if __name__ == "__main__": + run_all()