Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions adafruit_portalbase/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,108 @@ def process_json(self, json_data, json_path):
values = json.dumps(json_data)
return values

def _tz_from_env(self, default: float = 0.0) -> float:
tz = default
v = os.getenv("NTP_TZ")
if v:
try:
tz = float(v)
except ValueError:
pass
v = os.getenv("NTP_DST")
if v:
try:
tz += float(v)
except ValueError:
pass
return tz

def _socketpool_for_wifi(self):
"""Return a SocketPool for whichever Wi-Fi backend is available.
Works with native ESP32-S2/S3/C6 (wifi.radio) and ESP32SPI coprocessors.
Some CP10 board wrappers may not expose .radio/.esp; in that case,
try the native wifi.radio directly.
"""
wm = getattr(self, "_wifi", None)
radio = getattr(wm, "radio", None)
esp = getattr(wm, "esp", None) or getattr(wm, "_esp", None)

# CP10/MagTag fallback: wrapper didn't expose .radio/.esp -> use native radio
if (radio is None) and (esp is None):
try:
import wifi as _wifi_mod # type: ignore

radio = getattr(_wifi_mod, "radio", None)
except Exception:
radio = None

target = radio if (radio is not None) else esp
if target is None:
raise RuntimeError("No WiFi radio/esp found")

# Prefer connection_manager helper, else direct SocketPool fallback
try:
from adafruit_connection_manager import get_radio_socketpool # lazy import

return get_radio_socketpool(target)
except Exception:
import socketpool # type: ignore

return socketpool.SocketPool(target)

def _wait_for_ready_optional(self, timeout: float = 10.0, poll: float = 0.05) -> None:
wm = getattr(self, "_wifi", None)
ready = getattr(wm, "esp32_ready", None)
if ready is None:
return
start = time.monotonic()
while time.monotonic() - start < timeout:
if not ready.value:
return
time.sleep(poll)
raise TimeoutError("ESP32 not responding")

def time_sync(self, server=None, timeout=None, retries=None, tz=None):
server = server or os.getenv("NTP_SERVER", "0.adafruit.pool.ntp.org")
retries = int(os.getenv("NTP_RETRIES", "8")) if retries is None else int(retries)
timeout = float(os.getenv("NTP_TIMEOUT", "5.0")) if timeout is None else float(timeout)
tz = self._tz_from_env() if tz is None else float(tz)

if not self.is_connected:
self.connect()

try:
self._wait_for_ready_optional()
except Exception:
pass

pool = self._socketpool_for_wifi()

last_exc = None
for _ in range(max(1, retries)):
try:
from adafruit_ntp import NTP # lazy import for CI/tests

try:
ntp = NTP(pool, server=server, tz=tz, socket_timeout=timeout)
except TypeError:
ntp = NTP(pool, server=server, tz_offset=tz, socket_timeout=timeout)

setter = getattr(ntp, "set_time", None)
if setter:
setter()
return time.localtime()

now = ntp.datetime
if rtc:
rtc.RTC().datetime = now
return now
except Exception as ex:
last_exc = ex
time.sleep(0.5)

raise last_exc or RuntimeError("NTP sync failed")

@property
def is_connected(self):
"""Return whether we are connected."""
Expand Down
2 changes: 1 addition & 1 deletion adafruit_portalbase/wifi_coprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import adafruit_connection_manager
import adafruit_requests
import board
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
from adafruit_esp32spi import adafruit_esp32spi
from digitalio import DigitalInOut

__version__ = "0.0.0+auto.0"
Expand Down
30 changes: 30 additions & 0 deletions examples/portalbase_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# SPDX-FileCopyrightText: 2025 Mikey Sklar for Adafruit Industries
#
# SPDX-License-Identifier: MIT

# Uncomment ONE board setup:

# --- PyPortal ---
# from adafruit_pyportal import PyPortal
# portal = PyPortal(status_neopixel=None)
# net = portal.network

# --- MatrixPortal M4 ---
# from adafruit_matrixportal.matrixportal import MatrixPortal
# portal = MatrixPortal(status_neopixel=None)
# net = portal.network

# --- MagTag (ESP32-S2 native WiFi) ---
# from adafruit_magtag.magtag import MagTag
# magtag = MagTag(status_neopixel=None)
# net = magtag.network

# --- Fruit Jam (wrap its WiFi) ---
from adafruit_fruitjam import FruitJam

jam = FruitJam(status_neopixel=None)
net = jam.network


# --- shared output ---
print(net.time_sync())