From 25759506f8006c2d699f6df42ad58edf8fe33fbc Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Thu, 4 Dec 2025 14:49:48 -0300 Subject: [PATCH 01/11] ENH: Pass Acceleration Data to Parachute Trigger Functions (RocketPy-Team#XXX) * ENH: add u_dot parameter computation inside parachute trigger evaluation. * ENH: add acceleration_noise_function parameter to Flight class for realistic IMU simulation. * ENH: implement automatic detection of trigger signature to compute derivatives only when needed. * TST: add unit tests for parachute trigger with acceleration data and noise injection. * TST: add test runner for trigger acceleration validation without full test suite dependencies. --- rocketpy/simulation/flight.py | 137 ++++++++++++++--- tests/unit/_run_parachute_trigger_tests.py | 139 ++++++++++++++++++ .../test_parachute_trigger_acceleration.py | 121 +++++++++++++++ 3 files changed, 378 insertions(+), 19 deletions(-) create mode 100644 tests/unit/_run_parachute_trigger_tests.py create mode 100644 tests/unit/test_parachute_trigger_acceleration.py diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index a38be7d93..85912dd80 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -5,6 +5,7 @@ from functools import cached_property import numpy as np +import inspect from scipy.integrate import BDF, DOP853, LSODA, RK23, RK45, OdeSolver, Radau from rocketpy.simulation.flight_data_exporter import FlightDataExporter @@ -94,6 +95,12 @@ class Flight: function evaluation points and then interpolation is used to calculate them and feed the triggers. Can greatly improve run time in some cases. + Note + ---- + Calculating the derivative `u_dot` inside parachute trigger checks will + cause additional calls to the equations of motion (extra physics + evaluations). This increases CPU cost but enables realistic avionics + algorithms that rely on accelerometer data. Flight.terminate_on_apogee : bool Whether to terminate simulation when rocket reaches apogee. Flight.solver : scipy.integrate.LSODA @@ -487,6 +494,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-statements name="Flight", equations_of_motion="standard", ode_solver="LSODA", + acceleration_noise_function=None, ): """Run a trajectory simulation. @@ -600,6 +608,13 @@ def __init__( # pylint: disable=too-many-arguments,too-many-statements self.name = name self.equations_of_motion = equations_of_motion self.ode_solver = ode_solver + # Function that returns accelerometer noise vector [ax_noise, ay_noise, az_noise] + # It should be a callable that returns an array-like of length 3. + self.acceleration_noise_function = ( + acceleration_noise_function + if acceleration_noise_function is not None + else (lambda: np.zeros(3)) + ) # Controller initialization self.__init_controllers() @@ -739,11 +754,14 @@ def __simulate(self, verbose): ) = self.__calculate_and_save_pressure_signals( parachute, node.t, self.y_sol[2] ) - if parachute.triggerfunc( + if self._evaluate_parachute_trigger( + parachute, noisy_pressure, height_above_ground_level, self.y_sol, self.sensors, + phase.derivative, + self.t, ): # Remove parachute from flight parachutes self.parachutes.remove(parachute) @@ -772,8 +790,7 @@ def __simulate(self, verbose): lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, - added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -998,11 +1015,14 @@ def __simulate(self, verbose): ) # Check for parachute trigger - if parachute.triggerfunc( + if self._evaluate_parachute_trigger( + parachute, noisy_pressure, height_above_ground_level, overshootable_node.y_sol, self.sensors, + phase.derivative, + overshootable_node.t, ): # Remove parachute from flight parachutes self.parachutes.remove(parachute) @@ -1020,30 +1040,25 @@ def __simulate(self, verbose): i += 1 # Create flight phase for time after inflation callbacks = [ - lambda self, - parachute_cd_s=parachute.cd_s: setattr( + lambda self, parachute_cd_s=parachute.cd_s: setattr( self, "parachute_cd_s", parachute_cd_s ), - lambda self, - parachute_radius=parachute.radius: setattr( + lambda self, parachute_radius=parachute.radius: setattr( self, "parachute_radius", parachute_radius, ), - lambda self, - parachute_height=parachute.height: setattr( + lambda self, parachute_height=parachute.height: setattr( self, "parachute_height", parachute_height, ), - lambda self, - parachute_porosity=parachute.porosity: setattr( + lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity, ), - lambda self, - added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -1124,6 +1139,88 @@ def __calculate_and_save_pressure_signals(self, parachute, t, z): return noisy_pressure, height_above_ground_level + def _evaluate_parachute_trigger( + self, parachute, pressure, height, y, sensors, derivative_func, t + ): + """Evaluate parachute trigger, optionally computing u_dot (acceleration). + + This helper preserves backward compatibility with existing trigger + signatures and will compute ``u_dot`` only if the original user + provided trigger function expects an acceleration argument (detected + by parameter name such as 'u_dot', 'udot', 'acc', or 'acceleration'). + + Parameters + ---------- + parachute : Parachute + Parachute object. + pressure : float + Noisy pressure value passed to trigger. + height : float + Height above ground level passed to trigger. + y : array + State vector at evaluation time. + sensors : list + Sensors list passed to trigger. + derivative_func : callable + Function to compute derivatives: derivative_func(t, y) + t : float + Time at which to evaluate derivatives. + + Returns + ------- + bool + True if trigger condition met, False otherwise. + """ + # If original trigger is not callable (e.g. numeric or 'apogee'), + # use the prepared wrapper in Parachute + trig_original = parachute.trigger + if not callable(trig_original): + return parachute.triggerfunc(pressure, height, y, sensors) + + try: + sig = inspect.signature(trig_original) + params = list(sig.parameters.values()) + except (ValueError, TypeError): + return parachute.triggerfunc(pressure, height, y, sensors) + + # Detect whether the user-provided trigger expects acceleration + acc_names = {"u_dot", "udot", "acc", "acceleration"} + wants_u_dot = any(p.name in acc_names for p in params) + wants_sensors = any("sensor" in p.name for p in params) + + if wants_u_dot: + # Compute derivative and add optional accelerometer noise + u_dot = np.array(derivative_func(t, y), dtype=float) + try: + noise = np.asarray(self.acceleration_noise_function()) + if noise.size == 3: + # u_dot layout: [vx, vy, vz, ax, ay, az, ...] + u_dot[3:6] = u_dot[3:6] + noise + except Exception: + # If noise function fails, ignore and continue + pass + + # Call user function according to detected signature + try: + if wants_sensors: + # common case: (p, h, y, sensors, u_dot) + return trig_original(pressure, height, y, sensors, u_dot) + # fallback by arg count + if len(params) == 4: + # could be (p, h, y, u_dot) + return trig_original(pressure, height, y, u_dot) + if len(params) == 5: + # could be (p, h, y, sensors, u_dot) + return trig_original(pressure, height, y, sensors, u_dot) + # try calling with u_dot as kwarg + return trig_original(pressure, height, y, u_dot=u_dot) + except TypeError: + # If calling the original fails, fallback to wrapper + return parachute.triggerfunc(pressure, height, y, sensors) + + # Default: don't compute u_dot and use existing wrapper + return parachute.triggerfunc(pressure, height, y, sensors) + def __init_solution_monitors(self): # Initialize solution monitors self.out_of_rail_time = 0 @@ -1439,7 +1536,9 @@ def udot_rail2(self, t, u, post_processing=False): # pragma: no cover # Hey! We will finish this function later, now we just can use u_dot return self.u_dot_generalized(t, u, post_processing=post_processing) - def u_dot(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements + def u_dot( + self, t, u, post_processing=False + ): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when rocket is flying in 6 DOF motion during ascent out of rail and descent without parachute. @@ -1759,7 +1858,9 @@ def u_dot(self, t, u, post_processing=False): # pylint: disable=too-many-locals return u_dot - def u_dot_generalized(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements + def u_dot_generalized( + self, t, u, post_processing=False + ): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when the rocket is flying in 6 DOF motion in space and significant mass variation effects exist. Typical flight phases include powered ascent after launch @@ -3571,9 +3672,7 @@ def add(self, flight_phase, index=None): # TODO: quite complex method new_index = ( index - 1 if flight_phase.t < previous_phase.t - else index + 1 - if flight_phase.t > next_phase.t - else index + else index + 1 if flight_phase.t > next_phase.t else index ) flight_phase.t += adjust self.add(flight_phase, new_index) diff --git a/tests/unit/_run_parachute_trigger_tests.py b/tests/unit/_run_parachute_trigger_tests.py new file mode 100644 index 000000000..edf2c2261 --- /dev/null +++ b/tests/unit/_run_parachute_trigger_tests.py @@ -0,0 +1,139 @@ +import numpy as np +import traceback + +from rocketpy.simulation.flight import Flight +from rocketpy.rocket.parachute import Parachute + + +def _test_trigger_receives_u_dot_and_noise(): + def derivative_func(t, y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(p, h, y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return True + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.1, -0.2, 0.3]) + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3])) + + +def _test_trigger_with_sensors_and_u_dot(): + def derivative_func(t, y): + return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(p, h, y, sensors, u_dot): + recorded["sensors"] = sensors + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test2", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.0, 0.0, 0.0]) + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=5.0, + y=np.zeros(13), + sensors=["s1"], + derivative_func=derivative_func, + t=1.234, + ) + + assert res is False + assert recorded["sensors"] == ["s1"] + assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) + + +def _test_legacy_trigger_does_not_compute_u_dot(): + def derivative_func(t, y): + raise RuntimeError("derivative should not be called for legacy triggers") + + called = {} + + def legacy_trigger(p, h, y): + called["ok"] = True + return True + + parachute = Parachute( + name="legacy", + cd_s=1.0, + trigger=legacy_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.zeros(3) + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=0.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert called.get("ok", False) is True + + +def run_all(): + tests = [ + _test_trigger_receives_u_dot_and_noise, + _test_trigger_with_sensors_and_u_dot, + _test_legacy_trigger_does_not_compute_u_dot, + ] + failures = 0 + for t in tests: + name = t.__name__ + try: + t() + print(f"[PASS] {name}") + except Exception: + failures += 1 + print(f"[FAIL] {name}") + traceback.print_exc() + if failures: + print(f"{failures} test(s) failed") + raise SystemExit(1) + print("All tests passed") + + +if __name__ == "__main__": + run_all() diff --git a/tests/unit/test_parachute_trigger_acceleration.py b/tests/unit/test_parachute_trigger_acceleration.py new file mode 100644 index 000000000..ef046314b --- /dev/null +++ b/tests/unit/test_parachute_trigger_acceleration.py @@ -0,0 +1,121 @@ +import numpy as np +import pytest + +from rocketpy.simulation.flight import Flight +from rocketpy.rocket.parachute import Parachute + + +def test_trigger_receives_u_dot_and_noise(): + # Prepare derivative function that returns known u_dot + def derivative_func(t, y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + # User trigger expecting u_dot named exactly 'u_dot' + def user_trigger(p, h, y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return True + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + # Dummy flight-like object with acceleration_noise_function + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.1, -0.2, 0.3]) + + # Call the helper directly without instantiating full Flight + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert "u_dot" in recorded + # Original derivative accelerations were [1.0,2.0,3.0] + # After noise [0.1,-0.2,0.3] -> [1.1, 1.8, 3.3] + assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3])) + + +def test_trigger_with_sensors_and_u_dot(): + def derivative_func(t, y): + return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(p, h, y, sensors, u_dot): + recorded["sensors"] = sensors + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test2", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.0, 0.0, 0.0]) + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=5.0, + y=np.zeros(13), + sensors=["s1"], + derivative_func=derivative_func, + t=1.234, + ) + + assert res is False + assert recorded["sensors"] == ["s1"] + assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) + + +def test_legacy_trigger_does_not_compute_u_dot(): + # derivative function that raises if called + def derivative_func(t, y): + raise RuntimeError("derivative should not be called for legacy triggers") + + called = {} + + def legacy_trigger(p, h, y): + called["ok"] = True + return True + + parachute = Parachute( + name="legacy", + cd_s=1.0, + trigger=legacy_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.zeros(3) + + # Should not raise + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=0.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert called.get("ok", False) is True From 7ec7ac9e2ac7b2642350c7620bbd9d510577ce2c Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Sun, 7 Dec 2025 16:23:51 -0300 Subject: [PATCH 02/11] ENH: Add acceleration-based parachute triggers with IMU sensor simulation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This enhancement enables realistic avionics algorithms by providing access to acceleration data (u_dot) within parachute trigger functions, simulating how real flight computers use accelerometers (IMU) to detect flight phases. * ENH: Pass acceleration data (u_dot) to parachute trigger callbacks - Flight class now computes and injects u_dot derivatives into triggers - Automatic detection of trigger signatures to optimize performance - Only calculates u_dot when trigger explicitly requires it * ENH: Add built-in acceleration-based trigger functions - detect_apogee_acceleration: Detects apogee via vz ≈ 0 and az < 0 - detect_motor_burnout: Detects motor shutdown by acceleration drop - detect_freefall: Detects free-fall condition (low total acceleration) - detect_liftoff: Detects liftoff by high total acceleration - altitude_trigger_factory: Factory for altitude-based triggers * ENH: Implement optional accelerometer noise injection - New parameter acceleration_noise_function in Flight.__init__ - Simulates MEMS accelerometer noise (typical 0.1-0.3 m/s²) - Applied transparently to all acceleration-based triggers * TST: Add comprehensive unit tests for trigger evaluation - Validates u_dot computation and noise injection - Tests backward compatibility with legacy 3-parameter triggers - Ensures performance optimization skips u_dot for non-accelerating triggers Notes ----- - Triggers can now use signatures: (p, h, y) or (p, h, y, u_dot/acc/acceleration) - Built-in string triggers: apogee, apogee_acc, burnout, freefall, liftoff - Performance: u_dot computation doubles physics evaluations at trigger points - Fully backward compatible with existing altitude and custom triggers --- rocketpy/rocket/parachute.py | 230 ++++++++++++++++-- rocketpy/simulation/flight.py | 88 +++---- ...er_tests.py => test_parachute_triggers.py} | 15 +- 3 files changed, 253 insertions(+), 80 deletions(-) rename tests/unit/{_run_parachute_trigger_tests.py => test_parachute_triggers.py} (92%) diff --git a/rocketpy/rocket/parachute.py b/rocketpy/rocket/parachute.py index e27216e26..0892d9e22 100644 --- a/rocketpy/rocket/parachute.py +++ b/rocketpy/rocket/parachute.py @@ -7,6 +7,145 @@ from ..mathutils.function import Function from ..prints.parachute_prints import _ParachutePrints +def detect_motor_burnout(pressure, height, state_vector, u_dot): + """Detect motor burnout by sudden drop in acceleration. + + Returns True when vertical acceleration becomes significantly negative + (indicating end of propulsion phase) OR when total acceleration drops below + a threshold indicating coasting/free-fall has begun. + """ + try: + if u_dot is None or len(u_dot) < 6: + return False + + ax = float(u_dot[3]) + ay = float(u_dot[4]) + az = float(u_dot[5]) + + # Defensive checks for NaN/Inf + if not all(np.isfinite([ax, ay, az])): + return False + + total_acc = np.sqrt(ax * ax + ay * ay + az * az) + if not np.isfinite(total_acc): + return False + + # Additional safety: ignore spurious low-accel readings at t~0 by + # requiring the rocket to be above a small altitude and still ascending + vz = float(state_vector[5]) if len(state_vector) > 5 else 0 + if not np.isfinite(vz): + return False + + if height < 5.0 or vz <= 0.5: + return False + + # Burnout detected when: + # 1. Vertical acceleration becomes very negative (end of thrust phase) + # 2. OR total acceleration drops below 2.0 m/s² (coasting detected) + return az < -8.0 or total_acc < 2.0 + except (ValueError, TypeError, IndexError): + return False + + +def detect_apogee_acceleration(pressure, height, state_vector, u_dot): + """Detect apogee using near-zero vertical velocity and negative vertical accel. + + Apogee occurs when the rocket reaches its highest point, characterized by + vertical velocity approaching zero and negative (downward) acceleration. + """ + try: + if state_vector is None or u_dot is None: + return False + if len(state_vector) < 6 or len(u_dot) < 6: + return False + + vz = float(state_vector[5]) + az = float(u_dot[5]) + if not all(np.isfinite([vz, az])): + return False + + # Slightly more permissive thresholds to avoid spurious misses + return abs(vz) < 1.0 and az < -0.1 + except (ValueError, TypeError, IndexError): + return False + + +def detect_freefall(pressure, height, state_vector, u_dot): + """Detect free-fall when total acceleration magnitude is low. + + Free-fall is characterized by acceleration magnitude close to gravitational + acceleration (approximately -g in the vertical direction), or when the rocket + is in a ballistic coasting phase with minimal thrust or drag effects. + """ + try: + if u_dot is None or len(u_dot) < 6: + return False + + ax = float(u_dot[3]) + ay = float(u_dot[4]) + az = float(u_dot[5]) + if not all(np.isfinite([ax, ay, az])): + return False + + total_acc = np.sqrt(ax * ax + ay * ay + az * az) + if not np.isfinite(total_acc): + return False + + # Require the rocket to be descending and above a small altitude to + # avoid false positives before launch. + vz = float(state_vector[5]) if len(state_vector) > 5 else 0 + if not np.isfinite(vz): + return False + + if height < 5.0 or vz >= -0.2: + return False + + # More sensitive threshold: detect free-fall at lower acceleration + return total_acc < 11.5 + except (ValueError, TypeError, IndexError): + return False + + +def detect_liftoff(pressure, height, state_vector, u_dot): + """Detect liftoff by high total acceleration. + + Liftoff is characterized by a sudden increase in acceleration as the motor + ignites and begins producing thrust. + """ + try: + if u_dot is None or len(u_dot) < 6: + return False + + ax = float(u_dot[3]) + ay = float(u_dot[4]) + az = float(u_dot[5]) + if not all(np.isfinite([ax, ay, az])): + return False + + total_acc = np.sqrt(ax * ax + ay * ay + az * az) + if not np.isfinite(total_acc): + return False + + return total_acc > 15.0 + except (ValueError, TypeError, IndexError): + return False + + +def altitude_trigger_factory(target_altitude, require_descent=True): + """Return a trigger that deploys when altitude <= target_altitude. + + If require_descent is True, also require vertical velocity negative + (descending) to avoid firing during ascent. + """ + + def trigger(pressure, height, state_vector, u_dot=None): + vz = float(state_vector[5]) + if require_descent: + return (height <= target_altitude) and (vz < 0) + return height <= target_altitude + + return trigger + class Parachute: """Keeps information of the parachute, which is modeled as a hemispheroid. @@ -222,23 +361,52 @@ def __init__( def __evaluate_trigger_function(self, trigger): """This is used to set the triggerfunc attribute that will be used to interact with the Flight class. + + Notes + ----- + The resulting triggerfunc always has signature (p, h, y, fourth) so + Flight can pass either the sensors list or the u_dot (derivative) + depending on the runtime behaviour. """ # pylint: disable=unused-argument, function-redefined - # The parachute is deployed by a custom function - if callable(trigger): - # work around for having added sensors to parachute triggers - # to avoid breaking changes - triggerfunc = trigger - sig = signature(triggerfunc) - if len(sig.parameters) == 3: - def triggerfunc(p, h, y, sensors): - return trigger(p, h, y) + # Helper to wrap any callable to the internal (p, h, y, fourth) API + def _make_wrapper(fn): + sig = signature(fn) + params = list(sig.parameters.keys()) - self.triggerfunc = triggerfunc + # detect if user function expects acceleration-like argument + expects_udot = any( + name.lower() in ("u_dot", "udot", "acc", "acceleration") + for name in params[3:] + ) + + def wrapper(p, h, y, fourth): # fourth can be sensors or u_dot + # Support both 3- and 4-arg user functions + num_params = len(sig.parameters) + if num_params == 3: + return fn(p, h, y) + if num_params == 4: + return fn(p, h, y, fourth) + # fallback: try calling with four args, otherwise three + try: + return fn(p, h, y, fourth) + except TypeError: + return fn(p, h, y) + + # attach metadata so Flight can decide whether to compute u_dot + wrapper._expects_udot = expects_udot + wrapper._wrapped_fn = fn + return wrapper + + # Callable provided by user + if callable(trigger): + self.triggerfunc = _make_wrapper(trigger) + return + + # Numeric altitude trigger + if isinstance(trigger, (int, float)): - elif isinstance(trigger, (int, float)): - # The parachute is deployed at a given height def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument # p = pressure considering parachute noise signal # h = height above ground level considering parachute noise signal @@ -246,23 +414,37 @@ def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument return y[5] < 0 and h < trigger self.triggerfunc = triggerfunc + return + + # String: map to built-in triggers + if isinstance(trigger, str): + key = trigger.strip().lower() + mapping = { + "apogee_acc": detect_apogee_acceleration, + "burnout": detect_motor_burnout, + "freefall": detect_freefall, + "liftoff": detect_liftoff, + } + if key in mapping: + self.triggerfunc = _make_wrapper(mapping[key]) + return + + # Special case: "apogee" (legacy support) + if isinstance(trigger, str) and trigger.lower() == "apogee": - elif trigger.lower() == "apogee": - # The parachute is deployed at apogee def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument - # p = pressure considering parachute noise signal - # h = height above ground level considering parachute noise signal - # y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3] return y[5] < 0 self.triggerfunc = triggerfunc - - else: - raise ValueError( - f"Unable to set the trigger function for parachute '{self.name}'. " - + "Trigger must be a callable, a float value or the string 'apogee'. " - + "See the Parachute class documentation for more information." - ) + return + + # If we reach this point, the trigger is invalid + raise ValueError( + f"Unable to set the trigger function for parachute '{self.name}'. " + + "Trigger must be a callable, a float value or one of the strings " + + "('apogee','burnout','freefall','liftoff'). " + + "See the Parachute class documentation for more information." + ) def __str__(self): """Returns a string representation of the Parachute class. diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index 85912dd80..8f16ffeb0 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -1145,9 +1145,10 @@ def _evaluate_parachute_trigger( """Evaluate parachute trigger, optionally computing u_dot (acceleration). This helper preserves backward compatibility with existing trigger - signatures and will compute ``u_dot`` only if the original user - provided trigger function expects an acceleration argument (detected - by parameter name such as 'u_dot', 'udot', 'acc', or 'acceleration'). + signatures and will compute ``u_dot`` only if the prepared wrapper in + `Parachute` or the original trigger signature requests an acceleration + argument (detected by parameter name such as 'u_dot', 'udot', 'acc', + or 'acceleration'). Parameters ---------- @@ -1171,55 +1172,44 @@ def _evaluate_parachute_trigger( bool True if trigger condition met, False otherwise. """ - # If original trigger is not callable (e.g. numeric or 'apogee'), - # use the prepared wrapper in Parachute - trig_original = parachute.trigger - if not callable(trig_original): - return parachute.triggerfunc(pressure, height, y, sensors) - - try: - sig = inspect.signature(trig_original) - params = list(sig.parameters.values()) - except (ValueError, TypeError): - return parachute.triggerfunc(pressure, height, y, sensors) - - # Detect whether the user-provided trigger expects acceleration - acc_names = {"u_dot", "udot", "acc", "acceleration"} - wants_u_dot = any(p.name in acc_names for p in params) - wants_sensors = any("sensor" in p.name for p in params) - - if wants_u_dot: - # Compute derivative and add optional accelerometer noise - u_dot = np.array(derivative_func(t, y), dtype=float) + # Prefer the wrapper metadata: check if the prepared wrapper expects u_dot + triggerfunc = parachute.triggerfunc + expects_udot = getattr(triggerfunc, "_expects_udot", False) + + # If the wrapper didn't advertise, inspect the original user trigger + if not expects_udot: + trig_original = getattr(parachute, "trigger", None) + if callable(trig_original): + try: + sig = inspect.signature(trig_original) + params = list(sig.parameters.values()) + acc_names = {"u_dot", "udot", "acc", "acceleration"} + if any(p.name.lower() in acc_names for p in params): + expects_udot = True + except (ValueError, TypeError): + expects_udot = False + + # If the trigger expects acceleration, compute u_dot and inject noise + if expects_udot: try: - noise = np.asarray(self.acceleration_noise_function()) - if noise.size == 3: - # u_dot layout: [vx, vy, vz, ax, ay, az, ...] - u_dot[3:6] = u_dot[3:6] + noise + u_dot = np.array(derivative_func(t, y), dtype=float) + try: + noise = np.asarray(self.acceleration_noise_function()) + if noise.size == 3: + # u_dot layout: [vx, vy, vz, ax, ay, az, ...] + u_dot[3:6] = u_dot[3:6] + noise + except Exception: + # ignore noise errors and continue + pass + fourth_arg = u_dot except Exception: - # If noise function fails, ignore and continue - pass + # Fallback to sensors if derivative computation fails + fourth_arg = sensors + else: + fourth_arg = sensors - # Call user function according to detected signature - try: - if wants_sensors: - # common case: (p, h, y, sensors, u_dot) - return trig_original(pressure, height, y, sensors, u_dot) - # fallback by arg count - if len(params) == 4: - # could be (p, h, y, u_dot) - return trig_original(pressure, height, y, u_dot) - if len(params) == 5: - # could be (p, h, y, sensors, u_dot) - return trig_original(pressure, height, y, sensors, u_dot) - # try calling with u_dot as kwarg - return trig_original(pressure, height, y, u_dot=u_dot) - except TypeError: - # If calling the original fails, fallback to wrapper - return parachute.triggerfunc(pressure, height, y, sensors) - - # Default: don't compute u_dot and use existing wrapper - return parachute.triggerfunc(pressure, height, y, sensors) + # Call the prepared wrapper (it will forward args to the user's fn) + return triggerfunc(pressure, height, y, fourth_arg) def __init_solution_monitors(self): # Initialize solution monitors diff --git a/tests/unit/_run_parachute_trigger_tests.py b/tests/unit/test_parachute_triggers.py similarity index 92% rename from tests/unit/_run_parachute_trigger_tests.py rename to tests/unit/test_parachute_triggers.py index edf2c2261..db8401fb2 100644 --- a/tests/unit/_run_parachute_trigger_tests.py +++ b/tests/unit/test_parachute_triggers.py @@ -41,19 +41,20 @@ def user_trigger(p, h, y, u_dot): assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3])) -def _test_trigger_with_sensors_and_u_dot(): +def _test_trigger_with_u_dot_only(): + """Test trigger that only expects u_dot (no sensors).""" + def derivative_func(t, y): return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) recorded = {} - def user_trigger(p, h, y, sensors, u_dot): - recorded["sensors"] = sensors + def user_trigger(p, h, y, u_dot): recorded["u_dot"] = np.array(u_dot) return False parachute = Parachute( - name="test2", + name="test_u_dot_only", cd_s=1.0, trigger=user_trigger, sampling_rate=100, @@ -68,13 +69,13 @@ def user_trigger(p, h, y, sensors, u_dot): pressure=0.0, height=5.0, y=np.zeros(13), - sensors=["s1"], + sensors=[], derivative_func=derivative_func, t=1.234, ) assert res is False - assert recorded["sensors"] == ["s1"] + assert "u_dot" in recorded assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) @@ -116,7 +117,7 @@ def legacy_trigger(p, h, y): def run_all(): tests = [ _test_trigger_receives_u_dot_and_noise, - _test_trigger_with_sensors_and_u_dot, + _test_trigger_with_u_dot_only, _test_legacy_trigger_does_not_compute_u_dot, ] failures = 0 From 18aab57f1166b225709cae7068138717ae761b24 Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Mon, 8 Dec 2025 17:04:34 -0300 Subject: [PATCH 03/11] ENH: Enable sensors+acceleration triggers in parachutes Allow parachute triggers to accept (p, h, y, sensors, u_dot); Flight passes sensors+u_dot, computing u_dot on demand with noise; numeric/apogee legacy triggers carry metadata.\n\nTests: pytest tests/unit/test_parachute_trigger_acceleration.py -v\nLint: black rocketpy tests && ruff check rocketpy tests --- rocketpy/rocket/parachute.py | 45 ++++++++++++++++++++-------- rocketpy/simulation/flight.py | 56 ++++++++++++++++++----------------- 2 files changed, 62 insertions(+), 39 deletions(-) diff --git a/rocketpy/rocket/parachute.py b/rocketpy/rocket/parachute.py index 0892d9e22..ecabfea01 100644 --- a/rocketpy/rocket/parachute.py +++ b/rocketpy/rocket/parachute.py @@ -7,6 +7,7 @@ from ..mathutils.function import Function from ..prints.parachute_prints import _ParachutePrints + def detect_motor_burnout(pressure, height, state_vector, u_dot): """Detect motor burnout by sudden drop in acceleration. @@ -364,13 +365,12 @@ def __evaluate_trigger_function(self, trigger): Notes ----- - The resulting triggerfunc always has signature (p, h, y, fourth) so - Flight can pass either the sensors list or the u_dot (derivative) - depending on the runtime behaviour. + The resulting triggerfunc always has signature (p, h, y, sensors, u_dot) + so Flight can pass both sensors and u_dot when needed. """ # pylint: disable=unused-argument, function-redefined - # Helper to wrap any callable to the internal (p, h, y, fourth) API + # Helper to wrap any callable to the internal (p, h, y, sensors, u_dot) API def _make_wrapper(fn): sig = signature(fn) params = list(sig.parameters.keys()) @@ -380,22 +380,39 @@ def _make_wrapper(fn): name.lower() in ("u_dot", "udot", "acc", "acceleration") for name in params[3:] ) + # detect if user function expects sensors argument + expects_sensors = any(name.lower() == "sensors" for name in params[3:]) - def wrapper(p, h, y, fourth): # fourth can be sensors or u_dot - # Support both 3- and 4-arg user functions + def wrapper(p, h, y, sensors, u_dot): + # Support 3, 4, and 5-arg user functions num_params = len(sig.parameters) if num_params == 3: return fn(p, h, y) if num_params == 4: - return fn(p, h, y, fourth) - # fallback: try calling with four args, otherwise three + # Check which 4th arg to pass + fourth_param = params[3].lower() + if fourth_param in ("u_dot", "udot", "acc", "acceleration"): + return fn(p, h, y, u_dot) + else: + return fn(p, h, y, sensors) + if num_params >= 5: + # Pass both sensors and u_dot + return fn(p, h, y, sensors, u_dot) + # fallback: try calling with available args try: - return fn(p, h, y, fourth) + return fn(p, h, y, sensors, u_dot) except TypeError: - return fn(p, h, y) + try: + return fn(p, h, y, u_dot) + except TypeError: + try: + return fn(p, h, y, sensors) + except TypeError: + return fn(p, h, y) # attach metadata so Flight can decide whether to compute u_dot wrapper._expects_udot = expects_udot + wrapper._expects_sensors = expects_sensors wrapper._wrapped_fn = fn return wrapper @@ -407,12 +424,14 @@ def wrapper(p, h, y, fourth): # fourth can be sensors or u_dot # Numeric altitude trigger if isinstance(trigger, (int, float)): - def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument + def triggerfunc(p, h, y, sensors, u_dot): # pylint: disable=unused-argument # p = pressure considering parachute noise signal # h = height above ground level considering parachute noise signal # y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3] return y[5] < 0 and h < trigger + triggerfunc._expects_udot = False + triggerfunc._expects_sensors = True self.triggerfunc = triggerfunc return @@ -432,9 +451,11 @@ def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument # Special case: "apogee" (legacy support) if isinstance(trigger, str) and trigger.lower() == "apogee": - def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument + def triggerfunc(p, h, y, sensors, u_dot): # pylint: disable=unused-argument return y[5] < 0 + triggerfunc._expects_udot = False + triggerfunc._expects_sensors = True self.triggerfunc = triggerfunc return diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index 8f16ffeb0..238dde6b9 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -1142,13 +1142,11 @@ def __calculate_and_save_pressure_signals(self, parachute, t, z): def _evaluate_parachute_trigger( self, parachute, pressure, height, y, sensors, derivative_func, t ): - """Evaluate parachute trigger, optionally computing u_dot (acceleration). + """Evaluate parachute trigger, passing both sensors and u_dot to wrapper. This helper preserves backward compatibility with existing trigger - signatures and will compute ``u_dot`` only if the prepared wrapper in - `Parachute` or the original trigger signature requests an acceleration - argument (detected by parameter name such as 'u_dot', 'udot', 'acc', - or 'acceleration'). + signatures. The wrapper in Parachute always expects (p, h, y, sensors, u_dot) + and Flight computes u_dot only when the trigger requests it (optimization). Parameters ---------- @@ -1172,44 +1170,48 @@ def _evaluate_parachute_trigger( bool True if trigger condition met, False otherwise. """ - # Prefer the wrapper metadata: check if the prepared wrapper expects u_dot triggerfunc = parachute.triggerfunc + + # Check wrapper metadata for expectations expects_udot = getattr(triggerfunc, "_expects_udot", False) + expects_sensors = getattr(triggerfunc, "_expects_sensors", True) - # If the wrapper didn't advertise, inspect the original user trigger - if not expects_udot: + # Fallback: inspect original trigger signature if metadata missing + if not expects_udot and not expects_sensors: trig_original = getattr(parachute, "trigger", None) if callable(trig_original): try: sig = inspect.signature(trig_original) - params = list(sig.parameters.values()) + params = list(sig.parameters.keys()) acc_names = {"u_dot", "udot", "acc", "acceleration"} - if any(p.name.lower() in acc_names for p in params): - expects_udot = True + expects_udot = any(p.lower() in acc_names for p in params[3:]) + expects_sensors = any(p.lower() == "sensors" for p in params[3:]) except (ValueError, TypeError): expects_udot = False + expects_sensors = True - # If the trigger expects acceleration, compute u_dot and inject noise + # Compute u_dot only if needed (performance optimization) + u_dot = None if expects_udot: try: u_dot = np.array(derivative_func(t, y), dtype=float) - try: - noise = np.asarray(self.acceleration_noise_function()) - if noise.size == 3: - # u_dot layout: [vx, vy, vz, ax, ay, az, ...] - u_dot[3:6] = u_dot[3:6] + noise - except Exception: - # ignore noise errors and continue - pass - fourth_arg = u_dot + # Inject accelerometer noise if configured + if hasattr(self, "acceleration_noise_function"): + try: + noise = np.asarray(self.acceleration_noise_function()) + if noise.size == 3: + # u_dot layout: [vx, vy, vz, ax, ay, az, ...] + u_dot[3:6] = u_dot[3:6] + noise + except Exception: + # ignore noise errors and continue + pass except Exception: - # Fallback to sensors if derivative computation fails - fourth_arg = sensors - else: - fourth_arg = sensors + # If u_dot computation fails, leave as None + u_dot = None - # Call the prepared wrapper (it will forward args to the user's fn) - return triggerfunc(pressure, height, y, fourth_arg) + # Call the wrapper with both sensors and u_dot + # The wrapper will decide which args to pass to the user's function + return triggerfunc(pressure, height, y, sensors, u_dot) def __init_solution_monitors(self): # Initialize solution monitors From 916b0fcf361bf10985cc58427e475b09f4bf74a4 Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Mon, 8 Dec 2025 18:26:14 -0300 Subject: [PATCH 04/11] STY: Apply ruff format to flight.py --- rocketpy/simulation/flight.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index 28d249c0b..d102132d0 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -818,7 +818,8 @@ def __simulate(self, verbose): lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, + added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -1068,25 +1069,30 @@ def __simulate(self, verbose): i += 1 # Create flight phase for time after inflation callbacks = [ - lambda self, parachute_cd_s=parachute.cd_s: setattr( + lambda self, + parachute_cd_s=parachute.cd_s: setattr( self, "parachute_cd_s", parachute_cd_s ), - lambda self, parachute_radius=parachute.radius: setattr( + lambda self, + parachute_radius=parachute.radius: setattr( self, "parachute_radius", parachute_radius, ), - lambda self, parachute_height=parachute.height: setattr( + lambda self, + parachute_height=parachute.height: setattr( self, "parachute_height", parachute_height, ), - lambda self, parachute_porosity=parachute.porosity: setattr( + lambda self, + parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity, ), - lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, + added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -1690,9 +1696,7 @@ def udot_rail2(self, t, u, post_processing=False): # pragma: no cover # Hey! We will finish this function later, now we just can use u_dot return self.u_dot_generalized(t, u, post_processing=post_processing) - def u_dot( - self, t, u, post_processing=False - ): # pylint: disable=too-many-locals,too-many-statements + def u_dot(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when rocket is flying in 6 DOF motion during ascent out of rail and descent without parachute. @@ -2238,9 +2242,7 @@ def u_dot_generalized_3dof(self, t, u, post_processing=False): return u_dot - def u_dot_generalized( - self, t, u, post_processing=False - ): # pylint: disable=too-many-locals,too-many-statements + def u_dot_generalized(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when the rocket is flying in 6 DOF motion in space and significant mass variation effects exist. Typical flight phases include powered ascent after launch @@ -4075,7 +4077,9 @@ def add(self, flight_phase, index=None): # TODO: quite complex method new_index = ( index - 1 if flight_phase.t < previous_phase.t - else index + 1 if flight_phase.t > next_phase.t else index + else index + 1 + if flight_phase.t > next_phase.t + else index ) flight_phase.t += adjust self.add(flight_phase, new_index) From 8e88fa8cb0362d9d996ab82d2e5a39f3391150dc Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Tue, 9 Dec 2025 10:40:07 -0300 Subject: [PATCH 05/11] STY: Fix pylint for parachute triggers --- rocketpy/rocket/parachute.py | 12 ++++++------ rocketpy/simulation/flight.py | 6 +++--- tests/unit/test_parachute_trigger_acceleration.py | 13 ++++++------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/rocketpy/rocket/parachute.py b/rocketpy/rocket/parachute.py index 1ca42cb6a..b905fba26 100644 --- a/rocketpy/rocket/parachute.py +++ b/rocketpy/rocket/parachute.py @@ -8,7 +8,7 @@ from ..prints.parachute_prints import _ParachutePrints -def detect_motor_burnout(pressure, height, state_vector, u_dot): +def detect_motor_burnout(_pressure, height, state_vector, u_dot): """Detect motor burnout by sudden drop in acceleration. Returns True when vertical acceleration becomes significantly negative @@ -48,7 +48,7 @@ def detect_motor_burnout(pressure, height, state_vector, u_dot): return False -def detect_apogee_acceleration(pressure, height, state_vector, u_dot): +def detect_apogee_acceleration(_pressure, _height, state_vector, u_dot): """Detect apogee using near-zero vertical velocity and negative vertical accel. Apogee occurs when the rocket reaches its highest point, characterized by @@ -71,7 +71,7 @@ def detect_apogee_acceleration(pressure, height, state_vector, u_dot): return False -def detect_freefall(pressure, height, state_vector, u_dot): +def detect_freefall(_pressure, height, state_vector, u_dot): """Detect free-fall when total acceleration magnitude is low. Free-fall is characterized by acceleration magnitude close to gravitational @@ -107,7 +107,7 @@ def detect_freefall(pressure, height, state_vector, u_dot): return False -def detect_liftoff(pressure, height, state_vector, u_dot): +def detect_liftoff(_pressure, _height, _state_vector, u_dot): """Detect liftoff by high total acceleration. Liftoff is characterized by a sudden increase in acceleration as the motor @@ -139,7 +139,7 @@ def altitude_trigger_factory(target_altitude, require_descent=True): (descending) to avoid firing during ascent. """ - def trigger(pressure, height, state_vector, u_dot=None): + def trigger(_pressure, height, state_vector, _u_dot=None): vz = float(state_vector[5]) if require_descent: return (height <= target_altitude) and (vz < 0) @@ -359,7 +359,7 @@ def __init__( self.__evaluate_trigger_function(trigger) - def __evaluate_trigger_function(self, trigger): + def __evaluate_trigger_function(self, trigger): # pylint: disable=too-many-statements """This is used to set the triggerfunc attribute that will be used to interact with the Flight class. diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index d102132d0..ecd2b7e41 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -1,11 +1,11 @@ # pylint: disable=too-many-lines +import inspect import math import warnings from copy import deepcopy from functools import cached_property import numpy as np -import inspect from scipy.integrate import BDF, DOP853, LSODA, RK23, RK45, OdeSolver, Radau from rocketpy.simulation.flight_data_exporter import FlightDataExporter @@ -1236,10 +1236,10 @@ def _evaluate_parachute_trigger( if noise.size == 3: # u_dot layout: [vx, vy, vz, ax, ay, az, ...] u_dot[3:6] = u_dot[3:6] + noise - except Exception: + except (ValueError, TypeError): # ignore noise errors and continue pass - except Exception: + except (ValueError, TypeError, RuntimeError): # If u_dot computation fails, leave as None u_dot = None diff --git a/tests/unit/test_parachute_trigger_acceleration.py b/tests/unit/test_parachute_trigger_acceleration.py index ef046314b..0dcce6f81 100644 --- a/tests/unit/test_parachute_trigger_acceleration.py +++ b/tests/unit/test_parachute_trigger_acceleration.py @@ -1,5 +1,4 @@ import numpy as np -import pytest from rocketpy.simulation.flight import Flight from rocketpy.rocket.parachute import Parachute @@ -7,13 +6,13 @@ def test_trigger_receives_u_dot_and_noise(): # Prepare derivative function that returns known u_dot - def derivative_func(t, y): + def derivative_func(_t, _y): return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) recorded = {} # User trigger expecting u_dot named exactly 'u_dot' - def user_trigger(p, h, y, u_dot): + def user_trigger(_p, _h, _y, u_dot): recorded["u_dot"] = np.array(u_dot) return True @@ -48,12 +47,12 @@ def user_trigger(p, h, y, u_dot): def test_trigger_with_sensors_and_u_dot(): - def derivative_func(t, y): + def derivative_func(_t, _y): return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) recorded = {} - def user_trigger(p, h, y, sensors, u_dot): + def user_trigger(_p, _h, _y, sensors, u_dot): recorded["sensors"] = sensors recorded["u_dot"] = np.array(u_dot) return False @@ -86,12 +85,12 @@ def user_trigger(p, h, y, sensors, u_dot): def test_legacy_trigger_does_not_compute_u_dot(): # derivative function that raises if called - def derivative_func(t, y): + def derivative_func(_t, _y): raise RuntimeError("derivative should not be called for legacy triggers") called = {} - def legacy_trigger(p, h, y): + def legacy_trigger(_p, _h, _y): called["ok"] = True return True From a7f5c3a08e9dd57a53e4533dfefa6d1efaab9d00 Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Tue, 9 Dec 2025 11:57:51 -0300 Subject: [PATCH 06/11] TST: Add comprehensive unit tests for acceleration-based parachute triggers - TestTriggerSignatures: Validates 3, 4, and 5-parameter trigger signatures - TestBuiltInTriggers: Tests all built-in triggers (apogee_acc, burnout, freefall, liftoff) - TestParachuteInitialization: Verifies parachute creation with different trigger types - TestEdgeCases: Covers NaN/Inf handling, low altitude, and error conditions Tests use AAA pattern (Arrange, Act, Assert) and follow NumPy docstring style. All edge cases for realistic avionics simulation are covered. Fixes codecov patch coverage from 42.85% to >85%. --- .../test_parachute_trigger_acceleration.py | 590 ++++++++++++++---- 1 file changed, 473 insertions(+), 117 deletions(-) diff --git a/tests/unit/test_parachute_trigger_acceleration.py b/tests/unit/test_parachute_trigger_acceleration.py index 0dcce6f81..eb0336d82 100644 --- a/tests/unit/test_parachute_trigger_acceleration.py +++ b/tests/unit/test_parachute_trigger_acceleration.py @@ -1,120 +1,476 @@ +"""Unit tests for acceleration-based parachute triggers. + +Tests cover trigger function signatures, noise injection, built-in triggers, +and edge cases for realistic avionics simulation. +""" + import numpy as np +import pytest +from rocketpy.rocket.parachute import ( + Parachute, + detect_apogee_acceleration, + detect_freefall, + detect_liftoff, + detect_motor_burnout, +) from rocketpy.simulation.flight import Flight -from rocketpy.rocket.parachute import Parachute - - -def test_trigger_receives_u_dot_and_noise(): - # Prepare derivative function that returns known u_dot - def derivative_func(_t, _y): - return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) - - recorded = {} - - # User trigger expecting u_dot named exactly 'u_dot' - def user_trigger(_p, _h, _y, u_dot): - recorded["u_dot"] = np.array(u_dot) - return True - - parachute = Parachute( - name="test", - cd_s=1.0, - trigger=user_trigger, - sampling_rate=100, - ) - - # Dummy flight-like object with acceleration_noise_function - dummy = type("D", (), {})() - dummy.acceleration_noise_function = lambda: np.array([0.1, -0.2, 0.3]) - - # Call the helper directly without instantiating full Flight - res = Flight._evaluate_parachute_trigger( - dummy, - parachute, - pressure=0.0, - height=10.0, - y=np.zeros(13), - sensors=[], - derivative_func=derivative_func, - t=0.0, - ) - - assert res is True - assert "u_dot" in recorded - # Original derivative accelerations were [1.0,2.0,3.0] - # After noise [0.1,-0.2,0.3] -> [1.1, 1.8, 3.3] - assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3])) - - -def test_trigger_with_sensors_and_u_dot(): - def derivative_func(_t, _y): - return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) - - recorded = {} - - def user_trigger(_p, _h, _y, sensors, u_dot): - recorded["sensors"] = sensors - recorded["u_dot"] = np.array(u_dot) - return False - - parachute = Parachute( - name="test2", - cd_s=1.0, - trigger=user_trigger, - sampling_rate=100, - ) - - dummy = type("D", (), {})() - dummy.acceleration_noise_function = lambda: np.array([0.0, 0.0, 0.0]) - - res = Flight._evaluate_parachute_trigger( - dummy, - parachute, - pressure=0.0, - height=5.0, - y=np.zeros(13), - sensors=["s1"], - derivative_func=derivative_func, - t=1.234, - ) - - assert res is False - assert recorded["sensors"] == ["s1"] - assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) - - -def test_legacy_trigger_does_not_compute_u_dot(): - # derivative function that raises if called - def derivative_func(_t, _y): - raise RuntimeError("derivative should not be called for legacy triggers") - - called = {} - - def legacy_trigger(_p, _h, _y): - called["ok"] = True - return True - - parachute = Parachute( - name="legacy", - cd_s=1.0, - trigger=legacy_trigger, - sampling_rate=100, - ) - - dummy = type("D", (), {})() - dummy.acceleration_noise_function = lambda: np.zeros(3) - - # Should not raise - res = Flight._evaluate_parachute_trigger( - dummy, - parachute, - pressure=0.0, - height=0.0, - y=np.zeros(13), - sensors=[], - derivative_func=derivative_func, - t=0.0, - ) - - assert res is True - assert called.get("ok", False) is True + + +class TestTriggerSignatures: + """Test various trigger function signatures and detection.""" + + def test_trigger_receives_u_dot_and_noise(self): + """Test that triggers receive u_dot with injected noise. + + Verifies acceleration data is computed and noise is injected before + passing to user trigger functions. + """ + # Arrange + def derivative_func(_t, _y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return True + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.1, -0.2, 0.3]) + + # Act + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + # Assert + assert res is True + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3])) + + def test_trigger_with_sensors_and_u_dot(self): + """Test trigger with both sensors and u_dot parameters. + + Verifies wrapper correctly passes both sensors and acceleration data + to triggers expecting 5 parameters. + """ + # Arrange + def derivative_func(_t, _y): + return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, sensors, u_dot): + recorded["sensors"] = sensors + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test2", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.array([0.0, 0.0, 0.0]) + + # Act + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=5.0, + y=np.zeros(13), + sensors=["s1"], + derivative_func=derivative_func, + t=1.234, + ) + + # Assert + assert res is False + assert recorded["sensors"] == ["s1"] + assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) + + def test_legacy_trigger_does_not_compute_u_dot(self): + """Test that legacy 3-parameter triggers don't trigger u_dot computation. + + Ensures backward compatibility by skipping expensive derivative + computation when not needed. + """ + # Arrange + def derivative_func(_t, _y): + raise RuntimeError("derivative should not be called for legacy triggers") + + called = {} + + def legacy_trigger(_p, _h, _y): + called["ok"] = True + return True + + parachute = Parachute( + name="legacy", + cd_s=1.0, + trigger=legacy_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = lambda: np.zeros(3) + + # Act & Assert + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=0.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert called.get("ok", False) is True + + +class TestBuiltInTriggers: + """Test built-in acceleration-based trigger functions.""" + + def test_detect_apogee_acceleration_at_peak(self): + """Test apogee detection when velocity is near zero with negative acceleration. + + Apogee detection requires vertical velocity close to zero and + downward acceleration (negative az). + """ + # Arrange + state_vector = [0, 0, 1000, 0, 0, 0.3, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -1.0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_apogee_acceleration(101325, 1000, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_apogee_acceleration_ascending(self): + """Test apogee detection returns False while still ascending. + + Rocket must have near-zero vertical velocity to trigger apogee. + """ + # Arrange + state_vector = [0, 0, 500, 0, 0, 50, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -9.8, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_apogee_acceleration(101325, 500, state_vector, u_dot) + + # Assert + assert bool(result) is False + + def test_detect_motor_burnout_high_deceleration(self): + """Test burnout detection with sudden high negative acceleration. + + Motor burnout is indicated by total acceleration magnitude dropping + significantly. + """ + # Arrange + state_vector = [0, 0, 200, 0, 0, 100, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -12.0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_motor_burnout(101325, 200, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_motor_burnout_low_total_acceleration(self): + """Test burnout detection when total acceleration falls below threshold. + + Low total acceleration (< 1.5 m/s²) with descending rocket indicates + motor burnout. + """ + # Arrange + state_vector = [0, 0, 200, 0, 0, 100, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0.5, 0.5, 0.5, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_motor_burnout(101325, 200, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_freefall_near_zero_acceleration(self): + """Test freefall detection with near-zero total acceleration. + + Freefall occurs when total acceleration is very small (sensor noise level). + """ + # Arrange + state_vector = [0, 0, 500, 0, 0, -20, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0.1, 0.1, 0.1, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_freefall(101325, 500, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_liftoff_high_acceleration(self): + """Test liftoff detection with high upward acceleration. + + Liftoff requires total acceleration > 15 m/s² and low altitude. + """ + # Arrange + state_vector = [0, 0, 2, 0, 0, 5, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, 20.0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_liftoff(101325, 2, state_vector, u_dot) + + # Assert + assert bool(result) is True + + def test_detect_liftoff_on_pad(self): + """Test liftoff detection returns False while on launch pad. + + No liftoff until acceleration threshold is exceeded. + """ + # Arrange + state_vector = [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + # Act + result = detect_liftoff(101325, 0, state_vector, u_dot) + + # Assert + assert bool(result) is False + + +class TestParachuteInitialization: + """Test parachute creation with different trigger types.""" + + def test_parachute_string_trigger_apogee_acc(self): + """Test parachute initialization with 'apogee' string trigger.""" + # Arrange & Act + parachute = Parachute( + name="Main", + cd_s=1.5, + trigger="apogee_acc", + sampling_rate=105, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert bool(parachute.triggerfunc._expects_udot) is True + + def test_parachute_string_trigger_burnout(self): + """Test parachute initialization with 'burnout' string trigger.""" + # Arrange & Act + parachute = Parachute( + name="Drogue", + cd_s=0.5, + trigger="burnout", + sampling_rate=100, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert bool(parachute.triggerfunc._expects_udot) is True + + def test_parachute_numeric_altitude_trigger(self): + """Test parachute with numeric altitude-based trigger. + + Altitude triggers do not require u_dot computation for performance. + """ + # Arrange & Act + parachute = Parachute( + name="Main", + cd_s=1.5, + trigger=500, + sampling_rate=105, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert bool(parachute.triggerfunc._expects_udot) is False + + def test_parachute_callable_trigger_5_args(self): + """Test parachute with custom trigger expecting sensors and u_dot.""" + # Arrange + def custom_trigger(_p, _h, _y, sensors, u_dot): + return len(sensors) > 0 and u_dot[5] < -5.0 + + # Act + parachute = Parachute( + name="Custom", + cd_s=1.0, + trigger=custom_trigger, + sampling_rate=100, + ) + + # Assert + assert parachute.triggerfunc is not None + assert hasattr(parachute.triggerfunc, "_expects_udot") + assert hasattr(parachute.triggerfunc, "_expects_sensors") + + def test_parachute_invalid_string_trigger(self): + """Test parachute initialization raises ValueError for invalid trigger string.""" + # Arrange & Act & Assert + with pytest.raises(ValueError, match="Unable to set the trigger function"): + Parachute( + name="Invalid", + cd_s=1.0, + trigger="invalid_trigger_name", + sampling_rate=100, + ) + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_trigger_with_nan_values(self): + """Test that triggers handle NaN values gracefully. + + Triggers should return False (safe default) when data is invalid. + """ + # Arrange + state_vector = [0, 0, 500, 0, 0, float("nan"), 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, float("nan"), 0, 0, 0, 0, 0, 0, 0] + + # Act + result_apogee = detect_apogee_acceleration(101325, 500, state_vector, u_dot) + result_burnout = detect_motor_burnout(101325, 500, state_vector, u_dot) + result_freefall = detect_freefall(101325, 500, state_vector, u_dot) + result_liftoff = detect_liftoff(101325, 500, state_vector, u_dot) + + # Assert + assert bool(result_apogee) is False + assert bool(result_burnout) is False + assert bool(result_freefall) is False + assert bool(result_liftoff) is False + + def test_trigger_with_inf_values(self): + """Test that triggers handle infinity values safely.""" + # Arrange + state_vector = [0, 0, 500, 0, 0, float("inf"), 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, float("inf"), 0, 0, 0, 0, 0, 0, 0] + + # Act & Assert - should not raise, just return False + result_apogee = detect_apogee_acceleration(101325, 500, state_vector, u_dot) + result_freefall = detect_freefall(101325, 500, state_vector, u_dot) + + assert bool(result_apogee) is False + assert bool(result_freefall) is False + + def test_trigger_at_very_low_altitude(self): + """Test triggers near ground level (edge case). + + Some triggers have altitude thresholds to prevent false positives. + """ + # Arrange + state_vector = [0, 0, 1, 0, 0, -5, 1, 0, 0, 0, 0, 0, 0] + u_dot = [0, 0, 0, 0, 0, -0.5, 0, 0, 0, 0, 0, 0, 0] + + # Act + result_burnout = detect_motor_burnout(101325, 1, state_vector, u_dot) + result_freefall = detect_freefall(101325, 1, state_vector, u_dot) + + # Assert - should not trigger at very low altitude + assert bool(result_burnout) is False + assert bool(result_freefall) is False + + def test_trigger_with_no_noise_function(self): + """Test trigger evaluation when acceleration_noise_function is None.""" + # Arrange + def derivative_func(_t, _y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = None # No noise + + # Act + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + # Assert + assert res is False + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([1.0, 2.0, 3.0])) + + def test_trigger_derivative_computation_error(self): + """Test trigger evaluation handles derivative computation errors gracefully.""" + # Arrange + def derivative_func(_t, _y): + raise ValueError("Derivative computation failed") + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["called"] = True + recorded["u_dot"] = u_dot + return False + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + dummy.acceleration_noise_function = None + + # Act - should not raise, fallback gracefully + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + # Assert - function should still be called with None u_dot (fallback) + assert res is False + assert recorded.get("called", False) is True From ad7e6d03790bd2d4daba72993134a38f03f5b732 Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Tue, 9 Dec 2025 12:17:53 -0300 Subject: [PATCH 07/11] DOC: Add comprehensive documentation for acceleration-based parachute triggers - Created docs/user/parachute_triggers.rst with complete guide - Enhanced Parachute class docstring with trigger signature details - Added parachute_triggers.rst to docs/user/index.rst Documentation covers: - Built-in triggers (burnout, apogee_acc, freefall, liftoff) - Custom trigger examples (3, 4, and 5-parameter signatures) - IMU noise simulation - Performance considerations - Best practices and complete dual-deploy example --- docs/user/index.rst | 1 + docs/user/parachute_triggers.rst | 329 +++++++++++++++++++++++++++++++ rocketpy/rocket/parachute.py | 59 +++--- 3 files changed, 367 insertions(+), 22 deletions(-) create mode 100644 docs/user/parachute_triggers.rst diff --git a/docs/user/index.rst b/docs/user/index.rst index 5afaee3a6..580b9c612 100644 --- a/docs/user/index.rst +++ b/docs/user/index.rst @@ -24,6 +24,7 @@ RocketPy's User Guide :caption: Special Case Simulations Compare Flights Class + Parachute Triggers (Acceleration-Based) Deployable Payload Air Brakes Example ../notebooks/sensors.ipynb diff --git a/docs/user/parachute_triggers.rst b/docs/user/parachute_triggers.rst new file mode 100644 index 000000000..91563d10c --- /dev/null +++ b/docs/user/parachute_triggers.rst @@ -0,0 +1,329 @@ +Acceleration-Based Parachute Triggers +====================================== + +RocketPy supports advanced parachute deployment logic using acceleration data +from simulated IMU (Inertial Measurement Unit) sensors. This enables realistic +avionics algorithms that mimic real-world flight computers. + +Overview +-------- + +Traditional parachute triggers rely on altitude and velocity. Acceleration-based +triggers provide additional capabilities: + +- **Motor burnout detection**: Detect thrust termination via sudden deceleration +- **Apogee detection**: Use acceleration and velocity together for precise apogee +- **Freefall detection**: Identify ballistic coasting phases +- **Liftoff detection**: Confirm motor ignition via high acceleration + +These triggers can optionally include sensor noise to simulate realistic IMU +behavior, making simulations more representative of actual flight conditions. + +Built-in Triggers +----------------- + +RocketPy provides four built-in acceleration-based triggers: + +Motor Burnout Detection +~~~~~~~~~~~~~~~~~~~~~~~~ + +Detects when the motor stops producing thrust by monitoring sudden drops in +acceleration magnitude. + +.. code-block:: python + + from rocketpy import Parachute + + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger="burnout", # Built-in trigger + sampling_rate=100, + lag=1.5 + ) + +**Detection criteria:** +- Vertical acceleration < -8.0 m/s² (end of thrust phase), OR +- Total acceleration magnitude < 2.0 m/s² (coasting detected) +- Rocket must be above 5m altitude and ascending (prevents false triggers at launch) + +Apogee Detection +~~~~~~~~~~~~~~~~ + +Detects apogee using both near-zero vertical velocity and negative vertical +acceleration. + +.. code-block:: python + + main = Parachute( + name="Main", + cd_s=10.0, + trigger="apogee_acc", # Acceleration-based apogee + sampling_rate=100, + lag=0.5 + ) + +**Detection criteria:** +- Absolute vertical velocity < 1.0 m/s +- Vertical acceleration < -0.1 m/s² (downward) + +Freefall Detection +~~~~~~~~~~~~~~~~~~ + +Detects free-fall by monitoring very low total acceleration (near gravitational +acceleration only). + +.. code-block:: python + + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger="freefall", + sampling_rate=100, + lag=1.0 + ) + +**Detection criteria:** +- Total acceleration magnitude < 11.5 m/s² +- Rocket descending (vz < -0.2 m/s) +- Altitude > 5m (prevents ground-level false triggers) + +Liftoff Detection +~~~~~~~~~~~~~~~~~ + +Detects motor ignition via high total acceleration. + +.. code-block:: python + + test_parachute = Parachute( + name="Test", + cd_s=0.5, + trigger="liftoff", + sampling_rate=100, + lag=0.0 + ) + +**Detection criteria:** +- Total acceleration magnitude > 15.0 m/s² + +Custom Triggers +--------------- + +You can create custom triggers that use acceleration data: + +.. code-block:: python + + def custom_trigger(pressure, height, state_vector, u_dot): + """ + Custom trigger using acceleration data. + + Parameters + ---------- + pressure : float + Atmospheric pressure in Pa (with optional noise) + height : float + Height above ground level in meters (with optional noise) + state_vector : array + [x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz] + u_dot : array + Derivative: [vx, vy, vz, ax, ay, az, e0_dot, ...] + Accelerations are at indices [3:6] + + Returns + ------- + bool + True to trigger parachute deployment + """ + # Extract acceleration components (m/s²) + ax = u_dot[3] + ay = u_dot[4] + az = u_dot[5] + + # Calculate total acceleration magnitude + total_acc = (ax**2 + ay**2 + az**2)**0.5 + + # Custom logic: deploy if total acceleration < 5 m/s² while descending + vz = state_vector[5] + return total_acc < 5.0 and vz < -10.0 + + # Use custom trigger + parachute = Parachute( + name="Custom", + cd_s=2.0, + trigger=custom_trigger, + sampling_rate=100, + lag=1.0 + ) + +Sensor and Acceleration Triggers +--------------------------------- + +Triggers can also accept sensor data alongside acceleration: + +.. code-block:: python + + def advanced_trigger(pressure, height, state_vector, sensors, u_dot): + """ + Advanced trigger using both sensors and acceleration. + + Parameters + ---------- + sensors : list + List of sensor objects attached to the rocket + u_dot : array + State derivative including accelerations + + Returns + ------- + bool + """ + # Access sensor measurements + if len(sensors) > 0: + imu_reading = sensors[0].measurement + + # Use acceleration data + az = u_dot[5] + + # Combine sensor and acceleration logic + return az < -5.0 and imu_reading > threshold + + parachute = Parachute( + name="Advanced", + cd_s=1.5, + trigger=advanced_trigger, + sampling_rate=100 + ) + +Adding Acceleration Noise +-------------------------- + +To simulate realistic IMU behavior, you can add noise to acceleration data: + +.. code-block:: python + + from rocketpy import Flight + + flight = Flight( + rocket=my_rocket, + environment=env, + rail_length=5.2, + inclination=85, + heading=0, + acceleration_noise_function=lambda: np.random.normal(0, 0.5, 3) + ) + +The ``acceleration_noise_function`` returns a 3-element array ``[noise_x, noise_y, noise_z]`` +that is added to the acceleration components before passing to the trigger function. + +**Example with time-correlated noise:** + +.. code-block:: python + + class NoiseGenerator: + def __init__(self, stddev=0.5, correlation=0.9): + self.stddev = stddev + self.correlation = correlation + self.last_noise = np.zeros(3) + + def __call__(self): + # Time-correlated noise (AR(1) process) + white_noise = np.random.normal(0, self.stddev, 3) + self.last_noise = (self.correlation * self.last_noise + + np.sqrt(1 - self.correlation**2) * white_noise) + return self.last_noise + + flight = Flight( + rocket=my_rocket, + environment=env, + rail_length=5.2, + inclination=85, + heading=0, + acceleration_noise_function=NoiseGenerator(stddev=0.3, correlation=0.95) + ) + +Performance Considerations +-------------------------- + +Computing acceleration (``u_dot``) requires evaluating the equations of motion, +which adds computational cost. RocketPy optimizes this by: + +1. **Lazy evaluation**: ``u_dot`` is only computed if the trigger actually needs it +2. **Metadata detection**: The wrapper inspects trigger signatures to determine requirements +3. **Caching**: Derivative evaluations are reused when possible + +**Trigger signature detection:** + +- 3 parameters ``(p, h, y)``: Legacy trigger, no ``u_dot`` computed +- 4 parameters with ``u_dot``: Only acceleration computed +- 4 parameters with ``sensors``: Only sensors passed +- 5 parameters: Both sensors and acceleration provided + +Best Practices +-------------- + +1. **Choose appropriate sampling rates**: 50-200 Hz is typical for flight computers +2. **Add realistic noise**: Real IMUs have noise; simulate it for validation +3. **Test edge cases**: Verify triggers work at low altitudes, high speeds, etc. +4. **Use built-in triggers**: They handle edge cases (NaN, Inf) automatically +5. **Document custom triggers**: Include detection criteria in docstrings + +Example: Complete Dual-Deploy System +------------------------------------- + +.. code-block:: python + + from rocketpy import Rocket, Parachute, Flight, Environment + import numpy as np + + # Environment and rocket setup + env = Environment(latitude=32.99, longitude=-106.97, elevation=1400) + env.set_atmospheric_model(type="standard_atmosphere") + + my_rocket = Rocket(...) # Define your rocket + + # Drogue parachute: Deploy at motor burnout + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger="burnout", + sampling_rate=100, + lag=1.5, + noise=(0, 8.3, 0.5) # Pressure sensor noise + ) + my_rocket.add_parachute(drogue) + + # Main parachute: Deploy at 800m using custom trigger + def main_deploy_trigger(pressure, height, state_vector, u_dot): + """Deploy main at 800m while descending with positive vertical acceleration.""" + vz = state_vector[5] + az = u_dot[5] + return height < 800 and vz < -5 and az > -15 + + main = Parachute( + name="Main", + cd_s=10.0, + trigger=main_deploy_trigger, + sampling_rate=100, + lag=0.5, + noise=(0, 8.3, 0.5) + ) + my_rocket.add_parachute(main) + + # Flight with IMU noise simulation + flight = Flight( + rocket=my_rocket, + environment=env, + rail_length=5.2, + inclination=85, + heading=0, + acceleration_noise_function=lambda: np.random.normal(0, 0.3, 3) + ) + + flight.all_info() + +See Also +-------- + +- :doc:`Parachute Class Reference ` +- :doc:`Flight Simulation ` +- :doc:`Sensors and Controllers ` diff --git a/rocketpy/rocket/parachute.py b/rocketpy/rocket/parachute.py index b905fba26..1fbc2b973 100644 --- a/rocketpy/rocket/parachute.py +++ b/rocketpy/rocket/parachute.py @@ -164,41 +164,56 @@ class Parachute: This parameter defines the trigger condition for the parachute ejection system. It can be one of the following: - - A callable function that takes three arguments: - 1. Freestream pressure in pascals. - 2. Height in meters above ground level. - 3. The state vector of the simulation, which is defined as: - - `[x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz]`. - - 4. A list of sensors that are attached to the rocket. The most recent - measurements of the sensors are provided with the - ``sensor.measurement`` attribute. The sensors are listed in the same - order as they are added to the rocket. - - The function should return ``True`` if the parachute ejection system - should be triggered and False otherwise. The function will be called - according to the specified sampling rate. + - A callable function that can take 3, 4, or 5 arguments: + + **3 arguments** (legacy): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector: ``[x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz]`` + + **4 arguments** (sensors OR acceleration): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector. + 4. Either: + - ``sensors``: List of sensor objects attached to the rocket, OR + - ``u_dot``: State derivative including accelerations at indices [3:6] + + **5 arguments** (sensors AND acceleration): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector. + 4. ``sensors``: List of sensor objects. + 5. ``u_dot``: State derivative with accelerations ``[vx, vy, vz, ax, ay, az, ...]`` + + The function should return ``True`` to trigger deployment, ``False`` otherwise. + The function will be called according to the specified sampling rate. - A float value, representing an absolute height in meters. In this case, the parachute will be ejected when the rocket reaches this height - above ground level. + above ground level while descending. - - The string "apogee" which triggers the parachute at apogee, i.e., - when the rocket reaches its highest point and starts descending. + - A string for built-in triggers: + - ``"apogee"``: Legacy apogee detection (velocity-based) + - ``"apogee_acc"``: Apogee detection using acceleration data + - ``"burnout"``: Motor burnout detection via acceleration drop + - ``"freefall"``: Free-fall detection via low total acceleration + - ``"liftoff"``: Liftoff detection via high acceleration Parachute.triggerfunc : function Trigger function created from the trigger used to evaluate the trigger condition for the parachute ejection system. It is a callable function - that takes three arguments: Freestream pressure in Pa, Height above - ground level in meters, and the state vector of the simulation. The - returns ``True`` if the parachute ejection system should be triggered + that takes five arguments: Freestream pressure in Pa, Height above + ground level in meters, the state vector, sensors list, and u_dot. + Returns ``True`` if the parachute ejection system should be triggered and ``False`` otherwise. - .. note: + .. note:: The function will be called according to the sampling rate specified. + For performance, ``u_dot`` is only computed if the trigger signature + indicates it is needed. Parachute.sampling_rate : float Sampling rate, in Hz, for the trigger function. From 8f650223c035c3897d29d433b35e8ad15a3bd91b Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Tue, 9 Dec 2025 13:32:42 -0300 Subject: [PATCH 08/11] STY: Apply ruff format to test_parachute_trigger_acceleration.py --- tests/unit/test_parachute_trigger_acceleration.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/unit/test_parachute_trigger_acceleration.py b/tests/unit/test_parachute_trigger_acceleration.py index eb0336d82..5301bc451 100644 --- a/tests/unit/test_parachute_trigger_acceleration.py +++ b/tests/unit/test_parachute_trigger_acceleration.py @@ -26,6 +26,7 @@ def test_trigger_receives_u_dot_and_noise(self): Verifies acceleration data is computed and noise is injected before passing to user trigger functions. """ + # Arrange def derivative_func(_t, _y): return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) @@ -69,6 +70,7 @@ def test_trigger_with_sensors_and_u_dot(self): Verifies wrapper correctly passes both sensors and acceleration data to triggers expecting 5 parameters. """ + # Arrange def derivative_func(_t, _y): return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) @@ -113,6 +115,7 @@ def test_legacy_trigger_does_not_compute_u_dot(self): Ensures backward compatibility by skipping expensive derivative computation when not needed. """ + # Arrange def derivative_func(_t, _y): raise RuntimeError("derivative should not be called for legacy triggers") @@ -314,6 +317,7 @@ def test_parachute_numeric_altitude_trigger(self): def test_parachute_callable_trigger_5_args(self): """Test parachute with custom trigger expecting sensors and u_dot.""" + # Arrange def custom_trigger(_p, _h, _y, sensors, u_dot): return len(sensors) > 0 and u_dot[5] < -5.0 @@ -399,6 +403,7 @@ def test_trigger_at_very_low_altitude(self): def test_trigger_with_no_noise_function(self): """Test trigger evaluation when acceleration_noise_function is None.""" + # Arrange def derivative_func(_t, _y): return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) @@ -438,6 +443,7 @@ def user_trigger(_p, _h, _y, u_dot): def test_trigger_derivative_computation_error(self): """Test trigger evaluation handles derivative computation errors gracefully.""" + # Arrange def derivative_func(_t, _y): raise ValueError("Derivative computation failed") From d45b3a58725cace3cbfa46d1399fe1b31bc0c326 Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Tue, 9 Dec 2025 13:41:50 -0300 Subject: [PATCH 09/11] STY: Fix pylint warnings in test_parachute_triggers.py - Reorder imports: standard library before third-party - Prefix unused arguments with underscore - Add broad-exception-caught disable for test runner --- tests/unit/test_parachute_triggers.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_parachute_triggers.py b/tests/unit/test_parachute_triggers.py index db8401fb2..bd7dbbf2f 100644 --- a/tests/unit/test_parachute_triggers.py +++ b/tests/unit/test_parachute_triggers.py @@ -1,17 +1,18 @@ -import numpy as np import traceback +import numpy as np + from rocketpy.simulation.flight import Flight from rocketpy.rocket.parachute import Parachute def _test_trigger_receives_u_dot_and_noise(): - def derivative_func(t, y): + def derivative_func(_t, _y): return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) recorded = {} - def user_trigger(p, h, y, u_dot): + def user_trigger(_p, _h, _y, u_dot): recorded["u_dot"] = np.array(u_dot) return True @@ -44,12 +45,12 @@ def user_trigger(p, h, y, u_dot): def _test_trigger_with_u_dot_only(): """Test trigger that only expects u_dot (no sensors).""" - def derivative_func(t, y): + def derivative_func(_t, _y): return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) recorded = {} - def user_trigger(p, h, y, u_dot): + def user_trigger(_p, _h, _y, u_dot): recorded["u_dot"] = np.array(u_dot) return False @@ -80,12 +81,12 @@ def user_trigger(p, h, y, u_dot): def _test_legacy_trigger_does_not_compute_u_dot(): - def derivative_func(t, y): + def derivative_func(_t, _y): raise RuntimeError("derivative should not be called for legacy triggers") called = {} - def legacy_trigger(p, h, y): + def legacy_trigger(_p, _h, _y): called["ok"] = True return True @@ -126,7 +127,7 @@ def run_all(): try: t() print(f"[PASS] {name}") - except Exception: + except Exception: # pylint: disable=broad-exception-caught failures += 1 print(f"[FAIL] {name}") traceback.print_exc() From 6d79d62fabc7520d3fb14dd0f241b26ccc31d493 Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Tue, 9 Dec 2025 13:57:13 -0300 Subject: [PATCH 10/11] FIX: Add missing u_dot parameter to direct triggerfunc calls Direct calls to parachute.triggerfunc() were missing the u_dot parameter, causing TypeError in unit tests. Added u_dot=None to non-overshoot and overshoot trigger evaluation paths where u_dot is not computed. Fixes test failures in: - tests/unit/simulation/test_flight.py - tests/unit/test_utilities.py - tests/unit/test_rail_buttons_bending_moments.py - tests/unit/test_flight_data_exporter.py --- rocketpy/simulation/flight.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index dc67f9d8e..968e90bec 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -777,8 +777,7 @@ def __simulate(self, verbose): lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, - added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -964,6 +963,7 @@ def __check_and_handle_parachute_triggers( height_above_ground_level, self.y_sol, self.sensors, + None, # u_dot not computed in non-overshoot path ): continue # Check next parachute @@ -999,8 +999,7 @@ def __check_and_handle_parachute_triggers( lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, - added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -1373,6 +1372,7 @@ def __check_overshootable_parachute_triggers( height_above_ground_level, overshootable_node.y_sol, self.sensors, + None, # u_dot not computed in overshoot path ): continue # Check next parachute @@ -1405,8 +1405,7 @@ def __check_overshootable_parachute_triggers( lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, - added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -1989,7 +1988,9 @@ def udot_rail2(self, t, u, post_processing=False): # pragma: no cover # Hey! We will finish this function later, now we just can use u_dot return self.u_dot_generalized(t, u, post_processing=post_processing) - def u_dot(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements + def u_dot( + self, t, u, post_processing=False + ): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when rocket is flying in 6 DOF motion during ascent out of rail and descent without parachute. @@ -2535,7 +2536,9 @@ def u_dot_generalized_3dof(self, t, u, post_processing=False): return u_dot - def u_dot_generalized(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements + def u_dot_generalized( + self, t, u, post_processing=False + ): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when the rocket is flying in 6 DOF motion in space and significant mass variation effects exist. Typical flight phases include powered ascent after launch @@ -4370,9 +4373,7 @@ def add(self, flight_phase, index=None): # TODO: quite complex method new_index = ( index - 1 if flight_phase.t < previous_phase.t - else index + 1 - if flight_phase.t > next_phase.t - else index + else index + 1 if flight_phase.t > next_phase.t else index ) flight_phase.t += adjust self.add(flight_phase, new_index) From 5619b3a4dd9779bd8d35e9d966466c0496c05ae7 Mon Sep 17 00:00:00 2001 From: ViniciusCMB Date: Tue, 9 Dec 2025 13:59:26 -0300 Subject: [PATCH 11/11] STY: Apply ruff format to flight.py --- rocketpy/simulation/flight.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index 968e90bec..ff20e2d20 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -777,7 +777,8 @@ def __simulate(self, verbose): lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, + added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -999,7 +1000,8 @@ def __check_and_handle_parachute_triggers( lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, + added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -1405,7 +1407,8 @@ def __check_overshootable_parachute_triggers( lambda self, parachute_porosity=parachute.porosity: setattr( self, "parachute_porosity", parachute_porosity ), - lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr( + lambda self, + added_mass_coefficient=parachute.added_mass_coefficient: setattr( self, "parachute_added_mass_coefficient", added_mass_coefficient, @@ -1988,9 +1991,7 @@ def udot_rail2(self, t, u, post_processing=False): # pragma: no cover # Hey! We will finish this function later, now we just can use u_dot return self.u_dot_generalized(t, u, post_processing=post_processing) - def u_dot( - self, t, u, post_processing=False - ): # pylint: disable=too-many-locals,too-many-statements + def u_dot(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when rocket is flying in 6 DOF motion during ascent out of rail and descent without parachute. @@ -2536,9 +2537,7 @@ def u_dot_generalized_3dof(self, t, u, post_processing=False): return u_dot - def u_dot_generalized( - self, t, u, post_processing=False - ): # pylint: disable=too-many-locals,too-many-statements + def u_dot_generalized(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements """Calculates derivative of u state vector with respect to time when the rocket is flying in 6 DOF motion in space and significant mass variation effects exist. Typical flight phases include powered ascent after launch @@ -4373,7 +4372,9 @@ def add(self, flight_phase, index=None): # TODO: quite complex method new_index = ( index - 1 if flight_phase.t < previous_phase.t - else index + 1 if flight_phase.t > next_phase.t else index + else index + 1 + if flight_phase.t > next_phase.t + else index ) flight_phase.t += adjust self.add(flight_phase, new_index)