Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 192 additions & 50 deletions software/control/microscope.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import serial
from pathlib import Path
from typing import Optional, TypeVar
from typing import Optional

import numpy as np

import control._def
from control.core.channel_configuration_mananger import ChannelConfigurationManager
Expand All @@ -16,7 +17,6 @@
from control.piezo import PiezoStage
from control.serial_peripherals import SciMicroscopyLEDArray
from squid.abc import CameraAcquisitionMode, AbstractCamera, AbstractStage, AbstractFilterWheelController
from squid.stage.utils import move_z_axis_to_safety_position
from squid.stage.cephla import CephlaStage
from squid.stage.prior import PriorStage
import control.celesta
Expand All @@ -36,17 +36,17 @@
ObjectiveChanger2PosController_Simulation,
)
else:
ObjectiveChanger2PosController = TypeVar("ObjectiveChanger2PosController")
ObjectiveChanger2PosController = None

if control._def.RUN_FLUIDICS:
from control.fluidics import Fluidics
else:
Fluidics = TypeVar("Fluidics")
Fluidics = None

if control._def.ENABLE_NL5:
import control.NL5 as NL5
else:
NL5 = TypeVar("NL5")
NL5 = None
Comment on lines +39 to +49
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TypeVar to None replacement creates type checking issues. When these classes are set to None (feature disabled), type hints like Optional[ObjectiveChanger2PosController] become Optional[None], which is semantically incorrect. Consider using a string literal type or proper type stubs for conditional imports, or use TYPE_CHECKING to conditionally import these for type hints only.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Alpaca233 should it be TypeVar or None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what CC says
Analysis: This is a valid concern. When ObjectiveChanger2PosController = None, the type hint Optional[ObjectiveChanger2PosController] becomes Optional[None] which equals None - not useful for type checkers. However, the original TypeVar usage was also incorrect (TypeVar is for generics, not placeholders). A proper fix would use TYPE_CHECKING pattern.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using None is fine because we don't need type checking when we are not using these classes



class MicroscopeAddons:
Expand Down Expand Up @@ -211,7 +211,7 @@ def prepare_for_use(self):

class Microscope:
@staticmethod
def build_from_global_config(simulated: bool = False):
def build_from_global_config(simulated: bool = False) -> "Microscope":
low_level_devices = LowLevelDrivers.build_from_global_config(simulated)

stage_config = squid.config.get_stage_config()
Expand Down Expand Up @@ -239,7 +239,7 @@ def acquisition_camera_hw_trigger_fn(illumination_time: Optional[float]) -> bool
f"Sending hw trigger with illumination_time={illumination_time_us if illumination_time else None} [us]"
)
low_level_devices.microcontroller.send_hardware_trigger(
True if illumination_time else False, illumination_time_us
illumination_time is not None, illumination_time_us
)
return True

Expand Down Expand Up @@ -307,7 +307,6 @@ def __init__(
simulated: bool = False,
skip_prepare_for_use: bool = False,
):
super().__init__()
self._log = squid.logging.get_logger(self.__class__.__name__)

self.stage: AbstractStage = stage
Expand Down Expand Up @@ -445,20 +444,35 @@ def is_confocal_mode(self) -> bool:
"""
return self.channel_configuration_mananger.is_confocal_mode()

def update_camera_functions(self, functions: StreamHandlerFunctions):
def update_camera_functions(self, functions: StreamHandlerFunctions) -> None:
"""Update the stream handler callback functions for the main camera.

Args:
functions: New callback functions for frame handling.
"""
self.stream_handler.set_functions(functions)

def update_camera_focus_functions(self, functions: StreamHandlerFunctions):
def update_camera_focus_functions(self, functions: StreamHandlerFunctions) -> None:
"""Update the stream handler callback functions for the focus camera.

Args:
functions: New callback functions for frame handling.

Raises:
ValueError: If no focus camera is configured.
"""
if not self.addons.camera_focus:
raise ValueError("No focus camera, cannot change its stream handler functions.")

self.stream_handler_focus.set_functions(functions)

def initialize_core_components(self):
def initialize_core_components(self) -> None:
"""Initialize and home core hardware components like piezo stage."""
if self.addons.piezo_stage:
self.addons.piezo_stage.home()

def setup_hardware(self):
def setup_hardware(self) -> None:
"""Set up camera frame callbacks and start streaming for focus camera if present."""
self.camera.add_frame_callback(self.stream_handler.on_new_frame)
self.camera.enable_callbacks(True)

Expand All @@ -467,29 +481,50 @@ def setup_hardware(self):
self.addons.camera_focus.enable_callbacks(True)
self.addons.camera_focus.start_streaming()

def acquire_image(self):
def acquire_image(self) -> np.ndarray:
"""Acquire a single image from the camera.

Turns on illumination, triggers the camera, reads the frame, and turns off
illumination. The trigger mode (software vs hardware) is determined by the
live controller configuration.

Returns:
The acquired image as a numpy array.

Raises:
RuntimeError: If the camera fails to return a frame.
"""
using_software_trigger = self.live_controller.trigger_mode == control._def.TriggerMode.SOFTWARE

# turn on illumination and send trigger
if self.live_controller.trigger_mode == control._def.TriggerMode.SOFTWARE:
if using_software_trigger:
self.live_controller.turn_on_illumination()
self.waitForMicrocontroller()
self._wait_for_microcontroller()
self.camera.send_trigger()
elif self.live_controller.trigger_mode == control._def.TriggerMode.HARDWARE:
self.low_level_drivers.microcontroller.send_hardware_trigger(
control_illumination=True, illumination_on_time_us=self.camera.get_exposure_time() * 1000
)

# read a frame from camera
image = self.camera.read_frame()
if image is None:
print("self.camera.read_frame() returned None")

# turn off the illumination if using software trigger
if self.live_controller.trigger_mode == control._def.TriggerMode.SOFTWARE:
self.live_controller.turn_off_illumination()

return image

def home_xyz(self):
try:
# read a frame from camera
image = self.camera.read_frame()
if image is None:
self._log.error("camera.read_frame() returned None")
raise RuntimeError("Failed to acquire image: camera.read_frame() returned None")
return image
finally:
# always turn off illumination when using software trigger
if using_software_trigger:
self.live_controller.turn_off_illumination()

def home_xyz(self) -> None:
"""Home the X, Y, and Z axes based on configuration settings.

Homes Z first if enabled, then performs a coordinated X/Y homing sequence
that avoids the plate clamp actuation post by moving Y first, homing X,
moving X clear, then homing Y.
"""
if control._def.HOMING_ENABLED_Z:
self.stage.home(x=False, y=False, z=True, theta=False)
if control._def.HOMING_ENABLED_X and control._def.HOMING_ENABLED_Y:
Expand All @@ -510,70 +545,177 @@ def home_xyz(self):
self._log.info("Homing the Y axis...")
self.stage.home(x=False, y=True, z=False, theta=False)

def move_x(self, distance, blocking=True):
def move_x(self, distance: float, blocking: bool = True) -> None:
"""Move the stage by a relative distance along the X axis.

Args:
distance: Distance to move in mm (positive or negative).
blocking: If True, wait for movement to complete before returning.
"""
self.stage.move_x(distance, blocking=blocking)

def move_y(self, distance, blocking=True):
def move_y(self, distance: float, blocking: bool = True) -> None:
"""Move the stage by a relative distance along the Y axis.

Args:
distance: Distance to move in mm (positive or negative).
blocking: If True, wait for movement to complete before returning.
"""
self.stage.move_y(distance, blocking=blocking)

def move_x_to(self, position, blocking=True):
def move_x_to(self, position: float, blocking: bool = True) -> None:
"""Move the stage to an absolute X position.

Args:
position: Target position in mm.
blocking: If True, wait for movement to complete before returning.
"""
self.stage.move_x_to(position, blocking=blocking)

def move_y_to(self, position, blocking=True):
def move_y_to(self, position: float, blocking: bool = True) -> None:
"""Move the stage to an absolute Y position.

Args:
position: Target position in mm.
blocking: If True, wait for movement to complete before returning.
"""
self.stage.move_y_to(position, blocking=blocking)

def get_x(self):
def get_x(self) -> float:
"""Get the current X position of the stage.

Returns:
Current X position in mm.
"""
return self.stage.get_pos().x_mm

def get_y(self):
def get_y(self) -> float:
"""Get the current Y position of the stage.

Returns:
Current Y position in mm.
"""
return self.stage.get_pos().y_mm

def get_z(self):
def get_z(self) -> float:
"""Get the current Z position of the stage.

Returns:
Current Z position in mm.
"""
return self.stage.get_pos().z_mm

def move_z_to(self, z_mm, blocking=True):
self.stage.move_z_to(z_mm)
def move_z_to(self, z_mm: float, blocking: bool = True) -> None:
"""Move the stage to an absolute Z position.

Args:
z_mm: Target position in mm.
blocking: If True, wait for movement to complete before returning.
"""
self.stage.move_z_to(z_mm, blocking=blocking)

def start_live(self):
def start_live(self) -> None:
"""Start live view streaming from the camera."""
self.camera.start_streaming()
self.live_controller.start_live()

def stop_live(self):
def stop_live(self) -> None:
"""Stop live view streaming from the camera."""
self.live_controller.stop_live()
self.camera.stop_streaming()

def waitForMicrocontroller(self, timeout=5.0, error_message=None):
def _wait_for_microcontroller(self, timeout: float = 5.0, error_message: Optional[str] = None) -> None:
"""Wait for the microcontroller to complete the current operation.

Args:
timeout: Maximum time to wait in seconds.
error_message: Custom error message for timeout errors.

Raises:
TimeoutError: If operation does not complete within timeout.
"""
try:
self.low_level_drivers.microcontroller.wait_till_operation_is_completed(timeout)
except TimeoutError as e:
self._log.error(error_message or "Microcontroller operation timed out!")
raise e

def close(self):
self.stop_live()
self.low_level_drivers.microcontroller.close()
def close(self) -> None:
"""Close the microscope and release all hardware resources.

Attempts to cleanly shut down all hardware components. Errors during
shutdown are logged but do not prevent other components from being closed.
"""
try:
self.stop_live()
except Exception as e:
self._log.warning(f"Error stopping live view during close: {e}")

if self.low_level_drivers.microcontroller:
try:
self.low_level_drivers.microcontroller.close()
except Exception as e:
self._log.warning(f"Error closing microcontroller: {e}")

if self.addons.emission_filter_wheel:
self.addons.emission_filter_wheel.close()
try:
self.addons.emission_filter_wheel.close()
except Exception as e:
self._log.warning(f"Error closing emission filter wheel: {e}")

if self.addons.camera_focus:
self.addons.camera_focus.close()
self.camera.close()
try:
self.addons.camera_focus.close()
except Exception as e:
self._log.warning(f"Error closing focus camera: {e}")

def move_to_position(self, x, y, z):
try:
self.camera.close()
except Exception as e:
self._log.warning(f"Error closing camera: {e}")

def move_to_position(self, x: float, y: float, z: float) -> None:
"""Move the stage to an absolute XYZ position.

Args:
x: Target X position in mm.
y: Target Y position in mm.
z: Target Z position in mm.
"""
self.move_x_to(x)
self.move_y_to(y)
self.move_z_to(z)

def set_objective(self, objective):
def set_objective(self, objective: str) -> None:
"""Set the current objective lens.

Args:
objective: Name of the objective to set as current.
"""
self.objective_store.set_current_objective(objective)

def set_illumination_intensity(self, channel, intensity, objective=None):
def set_illumination_intensity(self, channel: str, intensity: float, objective: Optional[str] = None) -> None:
"""Set the illumination intensity for a channel.

Args:
channel: Name of the channel.
intensity: Illumination intensity value.
objective: Objective name. If None, uses current objective.
"""
if objective is None:
objective = self.objective_store.current_objective
channel_config = self.channel_configuration_mananger.get_channel_configuration_by_name(objective, channel)
channel_config.illumination_intensity = intensity
self.live_controller.set_microscope_mode(channel_config)

def set_exposure_time(self, channel, exposure_time, objective=None):
def set_exposure_time(self, channel: str, exposure_time: float, objective: Optional[str] = None) -> None:
"""Set the exposure time for a channel.

Args:
channel: Name of the channel.
exposure_time: Exposure time in milliseconds.
objective: Objective name. If None, uses current objective.
"""
if objective is None:
objective = self.objective_store.current_objective
channel_config = self.channel_configuration_mananger.get_channel_configuration_by_name(objective, channel)
Expand Down