Skip to content

Add WLAN support #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pslab/bus/busio.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from pslab.bus.i2c import _I2CPrimitive
from pslab.bus.spi import _SPIPrimitive
from pslab.bus.uart import _UARTPrimitive
from pslab.serial_handler import SerialHandler
from pslab.connection import ConnectionHandler

__all__ = (
"I2C",
Expand All @@ -59,7 +59,12 @@ class I2C(_I2CPrimitive):
Frequency of SCL in Hz.
"""

def __init__(self, device: SerialHandler = None, *, frequency: int = 125e3):
def __init__(
self,
device: ConnectionHandler | None = None,
*,
frequency: int = 125e3,
):
# 125 kHz is as low as the PSLab can go.
super().__init__(device)
self._init()
Expand Down Expand Up @@ -199,7 +204,7 @@ class SPI(_SPIPrimitive):
created.
"""

def __init__(self, device: SerialHandler = None):
def __init__(self, device: ConnectionHandler | None = None):
super().__init__(device)
ppre, spre = self._get_prescaler(25e4)
self._set_parameters(ppre, spre, 1, 0, 1)
Expand Down Expand Up @@ -412,7 +417,7 @@ class UART(_UARTPrimitive):

def __init__(
self,
device: SerialHandler = None,
device: ConnectionHandler | None = None,
*,
baudrate: int = 9600,
bits: int = 8,
Expand Down
10 changes: 5 additions & 5 deletions pslab/bus/i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import List

import pslab.protocol as CP
from pslab.serial_handler import SerialHandler
from pslab.connection import ConnectionHandler, autoconnect
from pslab.external.sensorlist import sensors

__all__ = (
Expand Down Expand Up @@ -54,8 +54,8 @@ class _I2CPrimitive:
_READ = 1
_WRITE = 0

def __init__(self, device: SerialHandler = None):
self._device = device if device is not None else SerialHandler()
def __init__(self, device: ConnectionHandler | None = None):
self._device = device if device is not None else autoconnect()
self._running = False
self._mode = None

Expand Down Expand Up @@ -447,7 +447,7 @@ class I2CMaster(_I2CPrimitive):
created.
"""

def __init__(self, device: SerialHandler = None):
def __init__(self, device: ConnectionHandler | None = None):
super().__init__(device)
self._init()
self.configure(125e3) # 125 kHz is as low as the PSLab can go.
Expand Down Expand Up @@ -506,7 +506,7 @@ class I2CSlave(_I2CPrimitive):
def __init__(
self,
address: int,
device: SerialHandler = None,
device: ConnectionHandler | None = None,
):
super().__init__(device)
self.address = address
Expand Down
12 changes: 6 additions & 6 deletions pslab/bus/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import pslab.protocol as CP
from pslab.bus import classmethod_
from pslab.serial_handler import SerialHandler
from pslab.connection import ConnectionHandler, autoconnect

__all__ = (
"SPIMaster",
Expand Down Expand Up @@ -67,8 +67,8 @@ class _SPIPrimitive:
_clock_edge = _CKE # Clock Edge Select bit (inverse of Clock Phase bit).
_smp = _SMP # Data Input Sample Phase bit.

def __init__(self, device: SerialHandler = None):
self._device = device if device is not None else SerialHandler()
def __init__(self, device: ConnectionHandler | None = None):
self._device = device if device is not None else autoconnect()

@classmethod_
@property
Expand Down Expand Up @@ -175,7 +175,7 @@ def _set_parameters(
self._device.send_byte(CP.SET_SPI_PARAMETERS)
# 0Bhgfedcba - > <g>: modebit CKP,<f>: modebit CKE, <ed>:primary prescaler,
# <cba>:secondary prescaler
self._device.send_byte(
self._device.send_int(
secondary_prescaler
| (primary_prescaler << 3)
| (CKE << 5)
Expand Down Expand Up @@ -419,7 +419,7 @@ class SPIMaster(_SPIPrimitive):
created.
"""

def __init__(self, device: SerialHandler = None):
def __init__(self, device: ConnectionHandler | None = None):
super().__init__(device)
# Reset config
self.set_parameters()
Expand Down Expand Up @@ -492,7 +492,7 @@ class SPISlave(_SPIPrimitive):
created.
"""

def __init__(self, device: SerialHandler = None):
def __init__(self, device: ConnectionHandler | None = None):
super().__init__(device)

def transfer8(self, data: int) -> int:
Expand Down
8 changes: 4 additions & 4 deletions pslab/bus/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import pslab.protocol as CP
from pslab.bus import classmethod_
from pslab.serial_handler import SerialHandler
from pslab.connection import ConnectionHandler, autoconnect

__all__ = "UART"
_BRGVAL = 0x22 # BaudRate = 460800.
Expand All @@ -41,8 +41,8 @@ class _UARTPrimitive:
_brgval = _BRGVAL
_mode = _MODE

def __init__(self, device: SerialHandler = None):
self._device = device if device is not None else SerialHandler()
def __init__(self, device: ConnectionHandler | None = None):
self._device = device if device is not None else autoconnect()

@classmethod_
@property
Expand Down Expand Up @@ -227,7 +227,7 @@ class UART(_UARTPrimitive):
Serial connection to PSLab device. If not provided, a new one will be created.
"""

def __init__(self, device: SerialHandler = None):
def __init__(self, device: ConnectionHandler | None = None):
super().__init__(device)
# Reset baudrate and mode
self.configure(self._get_uart_baudrate(_BRGVAL))
Expand Down
74 changes: 74 additions & 0 deletions pslab/connection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Interfaces for communicating with PSLab devices."""

from serial.tools import list_ports

from .connection import ConnectionHandler
from ._serial import SerialHandler
from .wlan import WLANHandler


def detect() -> list[ConnectionHandler]:
"""Detect PSLab devices.

Returns
-------
devices : list[ConnectionHandler]
Handlers for all detected PSLabs. The returned handlers are disconnected; call
.connect() before use.
"""
regex = []

for vid, pid in zip(SerialHandler._USB_VID, SerialHandler._USB_PID):
regex.append(f"{vid:04x}:{pid:04x}")

regex = "(" + "|".join(regex) + ")"
port_info_generator = list_ports.grep(regex)
pslab_devices = []

for port_info in port_info_generator:
device = SerialHandler(port=port_info.device, baudrate=1000000, timeout=1)

try:
device.connect()
except Exception:
pass # nosec
else:
pslab_devices.append(device)
finally:
device.disconnect()

try:
device = WLANHandler()
device.connect()
except Exception:
pass # nosec
else:
pslab_devices.append(device)
finally:
device.disconnect()

return pslab_devices


def autoconnect() -> ConnectionHandler:
"""Automatically connect when exactly one device is present.

Returns
-------
device : ConnectionHandler
A handler connected to the detected PSLab device. The handler is connected; it
is not necessary to call .connect before use().
"""
devices = detect()

if not devices:
msg = "device not found"
raise ConnectionError(msg)

if len(devices) > 1:
msg = f"autoconnect failed, multiple devices detected: {devices}"
raise ConnectionError(msg)

device = devices[0]
device.connect()
return device
158 changes: 158 additions & 0 deletions pslab/connection/_serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Serial interface for communicating with PSLab devices."""

import os
import platform

import serial

import pslab
from pslab.connection.connection import ConnectionHandler


def _check_serial_access_permission():
"""Check that we have permission to use the tty on Linux."""
if platform.system() == "Linux":
import grp

if os.geteuid() == 0: # Running as root?
return

for group in os.getgroups():
if grp.getgrgid(group).gr_name in (
"dialout",
"uucp",
):
return

udev_paths = [
"/run/udev/rules.d/",
"/etc/udev/rules.d/",
"/lib/udev/rules.d/",
]
for p in udev_paths:
udev_rules = os.path.join(p, "99-pslab.rules")
if os.path.isfile(udev_rules):
return
else:
raise PermissionError(
"The current user does not have permission to access "
"the PSLab device. To solve this, either:"
"\n\n"
"1. Add the user to the 'dialout' (on Debian-based "
"systems) or 'uucp' (on Arch-based systems) group."
"\n"
"2. Install a udev rule to allow any user access to the "
"device by running 'pslab install' as root, or by "
"manually copying "
f"{pslab.__path__[0]}/99-pslab.rules into {udev_paths[1]}."
"\n\n"
"You may also need to reboot the system for the "
"permission changes to take effect."
)


class SerialHandler(ConnectionHandler):
"""Interface for controlling a PSLab over a serial port.

Parameters
----------
port : str
baudrate : int, default 1 MBd
timeout : float, default 1 s
"""

# V5 V6
_USB_VID = [0x04D8, 0x10C4]
_USB_PID = [0x00DF, 0xEA60]

def __init__(
self,
port: str,
baudrate: int = 1000000,
timeout: float = 1.0,
):
self._port = port
self._ser = serial.Serial(
baudrate=baudrate,
timeout=timeout,
write_timeout=timeout,
)
_check_serial_access_permission()

@property
def port(self) -> str:
"""Serial port."""
return self._port

@property
def baudrate(self) -> int:
"""Symbol rate."""
return self._ser.baudrate

@baudrate.setter
def baudrate(self, value: int) -> None:
self._ser.baudrate = value

@property
def timeout(self) -> float:
"""Timeout in seconds."""
return self._ser.timeout

@timeout.setter
def timeout(self, value: float) -> None:
self._ser.timeout = value
self._ser.write_timeout = value

def connect(self) -> None:
"""Connect to PSLab."""
self._ser.port = self.port
self._ser.open()

try:
self.get_version()
except Exception:
self._ser.close()
raise

def disconnect(self):
"""Disconnect from PSLab."""
self._ser.close()

def read(self, number_of_bytes: int) -> bytes:
"""Read bytes from serial port.

Parameters
----------
number_of_bytes : int
Number of bytes to read from the serial port.

Returns
-------
bytes
Bytes read from the serial port.
"""
return self._ser.read(number_of_bytes)

def write(self, data: bytes) -> int:
"""Write bytes to serial port.

Parameters
----------
data : int
Bytes to write to the serial port.

Returns
-------
int
Number of bytes written.
"""
return self._ser.write(data)

def __repr__(self) -> str: # noqa
return (
f"{self.__class__.__name__}"
"["
f"{self.port}, "
f"{self.baudrate} baud"
"]"
)
Loading
Loading