diff --git a/software/control/microscope.py b/software/control/microscope.py index e9586276e..0bf4ea8d2 100644 --- a/software/control/microscope.py +++ b/software/control/microscope.py @@ -1,6 +1,7 @@ -import serial from pathlib import Path -from typing import Optional, TypeVar +from typing import Optional + +import numpy as np import control._def from control.core.channel_configuration_mananger import ChannelConfigurationManager @@ -16,7 +17,6 @@ from control.piezo import PiezoStage from control.serial_peripherals import SciMicroscopyLEDArray from squid.abc import CameraAcquisitionMode, AbstractCamera, AbstractStage, AbstractFilterWheelController -from squid.stage.utils import move_z_axis_to_safety_position from squid.stage.cephla import CephlaStage from squid.stage.prior import PriorStage import control.celesta @@ -36,17 +36,17 @@ ObjectiveChanger2PosController_Simulation, ) else: - ObjectiveChanger2PosController = TypeVar("ObjectiveChanger2PosController") + ObjectiveChanger2PosController = None if control._def.RUN_FLUIDICS: from control.fluidics import Fluidics else: - Fluidics = TypeVar("Fluidics") + Fluidics = None if control._def.ENABLE_NL5: import control.NL5 as NL5 else: - NL5 = TypeVar("NL5") + NL5 = None class MicroscopeAddons: @@ -211,7 +211,7 @@ def prepare_for_use(self): class Microscope: @staticmethod - def build_from_global_config(simulated: bool = False): + def build_from_global_config(simulated: bool = False) -> "Microscope": low_level_devices = LowLevelDrivers.build_from_global_config(simulated) stage_config = squid.config.get_stage_config() @@ -239,7 +239,7 @@ def acquisition_camera_hw_trigger_fn(illumination_time: Optional[float]) -> bool f"Sending hw trigger with illumination_time={illumination_time_us if illumination_time else None} [us]" ) low_level_devices.microcontroller.send_hardware_trigger( - True if illumination_time else False, illumination_time_us + illumination_time is not None, illumination_time_us ) return True @@ -307,7 +307,6 @@ def __init__( simulated: bool = False, skip_prepare_for_use: bool = False, ): - super().__init__() self._log = squid.logging.get_logger(self.__class__.__name__) self.stage: AbstractStage = stage @@ -445,20 +444,35 @@ def is_confocal_mode(self) -> bool: """ return self.channel_configuration_mananger.is_confocal_mode() - def update_camera_functions(self, functions: StreamHandlerFunctions): + def update_camera_functions(self, functions: StreamHandlerFunctions) -> None: + """Update the stream handler callback functions for the main camera. + + Args: + functions: New callback functions for frame handling. + """ self.stream_handler.set_functions(functions) - def update_camera_focus_functions(self, functions: StreamHandlerFunctions): + def update_camera_focus_functions(self, functions: StreamHandlerFunctions) -> None: + """Update the stream handler callback functions for the focus camera. + + Args: + functions: New callback functions for frame handling. + + Raises: + ValueError: If no focus camera is configured. + """ if not self.addons.camera_focus: raise ValueError("No focus camera, cannot change its stream handler functions.") self.stream_handler_focus.set_functions(functions) - def initialize_core_components(self): + def initialize_core_components(self) -> None: + """Initialize and home core hardware components like piezo stage.""" if self.addons.piezo_stage: self.addons.piezo_stage.home() - def setup_hardware(self): + def setup_hardware(self) -> None: + """Set up camera frame callbacks and start streaming for focus camera if present.""" self.camera.add_frame_callback(self.stream_handler.on_new_frame) self.camera.enable_callbacks(True) @@ -467,29 +481,50 @@ def setup_hardware(self): self.addons.camera_focus.enable_callbacks(True) self.addons.camera_focus.start_streaming() - def acquire_image(self): + def acquire_image(self) -> np.ndarray: + """Acquire a single image from the camera. + + Turns on illumination, triggers the camera, reads the frame, and turns off + illumination. The trigger mode (software vs hardware) is determined by the + live controller configuration. + + Returns: + The acquired image as a numpy array. + + Raises: + RuntimeError: If the camera fails to return a frame. + """ + using_software_trigger = self.live_controller.trigger_mode == control._def.TriggerMode.SOFTWARE + # turn on illumination and send trigger - if self.live_controller.trigger_mode == control._def.TriggerMode.SOFTWARE: + if using_software_trigger: self.live_controller.turn_on_illumination() - self.waitForMicrocontroller() + self._wait_for_microcontroller() self.camera.send_trigger() elif self.live_controller.trigger_mode == control._def.TriggerMode.HARDWARE: self.low_level_drivers.microcontroller.send_hardware_trigger( control_illumination=True, illumination_on_time_us=self.camera.get_exposure_time() * 1000 ) - # read a frame from camera - image = self.camera.read_frame() - if image is None: - print("self.camera.read_frame() returned None") - - # turn off the illumination if using software trigger - if self.live_controller.trigger_mode == control._def.TriggerMode.SOFTWARE: - self.live_controller.turn_off_illumination() - - return image - - def home_xyz(self): + try: + # read a frame from camera + image = self.camera.read_frame() + if image is None: + self._log.error("camera.read_frame() returned None") + raise RuntimeError("Failed to acquire image: camera.read_frame() returned None") + return image + finally: + # always turn off illumination when using software trigger + if using_software_trigger: + self.live_controller.turn_off_illumination() + + def home_xyz(self) -> None: + """Home the X, Y, and Z axes based on configuration settings. + + Homes Z first if enabled, then performs a coordinated X/Y homing sequence + that avoids the plate clamp actuation post by moving Y first, homing X, + moving X clear, then homing Y. + """ if control._def.HOMING_ENABLED_Z: self.stage.home(x=False, y=False, z=True, theta=False) if control._def.HOMING_ENABLED_X and control._def.HOMING_ENABLED_Y: @@ -510,70 +545,177 @@ def home_xyz(self): self._log.info("Homing the Y axis...") self.stage.home(x=False, y=True, z=False, theta=False) - def move_x(self, distance, blocking=True): + def move_x(self, distance: float, blocking: bool = True) -> None: + """Move the stage by a relative distance along the X axis. + + Args: + distance: Distance to move in mm (positive or negative). + blocking: If True, wait for movement to complete before returning. + """ self.stage.move_x(distance, blocking=blocking) - def move_y(self, distance, blocking=True): + def move_y(self, distance: float, blocking: bool = True) -> None: + """Move the stage by a relative distance along the Y axis. + + Args: + distance: Distance to move in mm (positive or negative). + blocking: If True, wait for movement to complete before returning. + """ self.stage.move_y(distance, blocking=blocking) - def move_x_to(self, position, blocking=True): + def move_x_to(self, position: float, blocking: bool = True) -> None: + """Move the stage to an absolute X position. + + Args: + position: Target position in mm. + blocking: If True, wait for movement to complete before returning. + """ self.stage.move_x_to(position, blocking=blocking) - def move_y_to(self, position, blocking=True): + def move_y_to(self, position: float, blocking: bool = True) -> None: + """Move the stage to an absolute Y position. + + Args: + position: Target position in mm. + blocking: If True, wait for movement to complete before returning. + """ self.stage.move_y_to(position, blocking=blocking) - def get_x(self): + def get_x(self) -> float: + """Get the current X position of the stage. + + Returns: + Current X position in mm. + """ return self.stage.get_pos().x_mm - def get_y(self): + def get_y(self) -> float: + """Get the current Y position of the stage. + + Returns: + Current Y position in mm. + """ return self.stage.get_pos().y_mm - def get_z(self): + def get_z(self) -> float: + """Get the current Z position of the stage. + + Returns: + Current Z position in mm. + """ return self.stage.get_pos().z_mm - def move_z_to(self, z_mm, blocking=True): - self.stage.move_z_to(z_mm) + def move_z_to(self, z_mm: float, blocking: bool = True) -> None: + """Move the stage to an absolute Z position. + + Args: + z_mm: Target position in mm. + blocking: If True, wait for movement to complete before returning. + """ + self.stage.move_z_to(z_mm, blocking=blocking) - def start_live(self): + def start_live(self) -> None: + """Start live view streaming from the camera.""" self.camera.start_streaming() self.live_controller.start_live() - def stop_live(self): + def stop_live(self) -> None: + """Stop live view streaming from the camera.""" self.live_controller.stop_live() self.camera.stop_streaming() - def waitForMicrocontroller(self, timeout=5.0, error_message=None): + def _wait_for_microcontroller(self, timeout: float = 5.0, error_message: Optional[str] = None) -> None: + """Wait for the microcontroller to complete the current operation. + + Args: + timeout: Maximum time to wait in seconds. + error_message: Custom error message for timeout errors. + + Raises: + TimeoutError: If operation does not complete within timeout. + """ try: self.low_level_drivers.microcontroller.wait_till_operation_is_completed(timeout) except TimeoutError as e: self._log.error(error_message or "Microcontroller operation timed out!") raise e - def close(self): - self.stop_live() - self.low_level_drivers.microcontroller.close() + def close(self) -> None: + """Close the microscope and release all hardware resources. + + Attempts to cleanly shut down all hardware components. Errors during + shutdown are logged but do not prevent other components from being closed. + """ + try: + self.stop_live() + except Exception as e: + self._log.warning(f"Error stopping live view during close: {e}") + + if self.low_level_drivers.microcontroller: + try: + self.low_level_drivers.microcontroller.close() + except Exception as e: + self._log.warning(f"Error closing microcontroller: {e}") + if self.addons.emission_filter_wheel: - self.addons.emission_filter_wheel.close() + try: + self.addons.emission_filter_wheel.close() + except Exception as e: + self._log.warning(f"Error closing emission filter wheel: {e}") + if self.addons.camera_focus: - self.addons.camera_focus.close() - self.camera.close() + try: + self.addons.camera_focus.close() + except Exception as e: + self._log.warning(f"Error closing focus camera: {e}") - def move_to_position(self, x, y, z): + try: + self.camera.close() + except Exception as e: + self._log.warning(f"Error closing camera: {e}") + + def move_to_position(self, x: float, y: float, z: float) -> None: + """Move the stage to an absolute XYZ position. + + Args: + x: Target X position in mm. + y: Target Y position in mm. + z: Target Z position in mm. + """ self.move_x_to(x) self.move_y_to(y) self.move_z_to(z) - def set_objective(self, objective): + def set_objective(self, objective: str) -> None: + """Set the current objective lens. + + Args: + objective: Name of the objective to set as current. + """ self.objective_store.set_current_objective(objective) - def set_illumination_intensity(self, channel, intensity, objective=None): + def set_illumination_intensity(self, channel: str, intensity: float, objective: Optional[str] = None) -> None: + """Set the illumination intensity for a channel. + + Args: + channel: Name of the channel. + intensity: Illumination intensity value. + objective: Objective name. If None, uses current objective. + """ if objective is None: objective = self.objective_store.current_objective channel_config = self.channel_configuration_mananger.get_channel_configuration_by_name(objective, channel) channel_config.illumination_intensity = intensity self.live_controller.set_microscope_mode(channel_config) - def set_exposure_time(self, channel, exposure_time, objective=None): + def set_exposure_time(self, channel: str, exposure_time: float, objective: Optional[str] = None) -> None: + """Set the exposure time for a channel. + + Args: + channel: Name of the channel. + exposure_time: Exposure time in milliseconds. + objective: Objective name. If None, uses current objective. + """ if objective is None: objective = self.objective_store.current_objective channel_config = self.channel_configuration_mananger.get_channel_configuration_by_name(objective, channel)