Initial release v1.1.0 — ESP32 automated telescope flat panel
Firmware (Arduino Nano ESP32 / PlatformIO): - Native ASCOM Alpaca CoverCalibrator REST API on port 11111 - Alnitak serial protocol on USB at 9600 baud (simultaneous with WiFi) - MG995 servo cover mechanism with non-blocking state machine (D9) - LED brightness PWM via IRLZ44N MOSFET, LEDC channel 0 (D3) - 12V dew heater PWM via IRLZ44N MOSFET, LEDC channel 1 (D5) - mDNS + UDP Alpaca discovery, WiFi watchdog reconnect - SERIAL_DEBUG flag to silence debug output in USB-only mode INDI driver (C++ / libcurl / nlohmann-json): - WiFi mode: HTTP Alpaca via libcurl - USB mode: Alnitak serial via POSIX termios - LightBoxInterface + DustCapInterface + dew heater number property Python controller (PyQt6): - Dark-themed desktop app for direct manual control - AlpacaBackend (requests) + AlnitakBackend (pyserial) - PollWorker QThread; cover, brightness, dew heater panels - QSettings persistence; auto serial port discovery Docs: - system-diagram.svg, wiring-diagram.svg (browser-renderable SVG) - BUILD_NOTES.md with BOM, LM2596 calibration, power-on checklist - WIRING.md quick-reference, README.md, CHANGELOG.md
This commit is contained in:
commit
c32f00a2be
15 changed files with 3653 additions and 0 deletions
909
controller/flatpanel_controller.py
Normal file
909
controller/flatpanel_controller.py
Normal file
|
|
@ -0,0 +1,909 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
ESP32 FlatPanel Controller
|
||||
===========================
|
||||
Manual control application for the ESP32 telescope flat panel.
|
||||
|
||||
Supports:
|
||||
WiFi mode – ASCOM Alpaca REST API over HTTP
|
||||
USB mode – Alnitak serial protocol at 9600 baud
|
||||
|
||||
Requirements:
|
||||
pip install PyQt6 requests pyserial
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import threading
|
||||
from abc import ABC, abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
import requests
|
||||
except ImportError:
|
||||
print("ERROR: 'requests' not installed. Run: pip install requests")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
except ImportError:
|
||||
print("ERROR: 'pyserial' not installed. Run: pip install pyserial")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
from PyQt6.QtCore import (Qt, QTimer, QSettings, QThread, QSize,
|
||||
pyqtSignal, QObject)
|
||||
from PyQt6.QtGui import QColor, QPalette, QFont, QIcon, QPixmap
|
||||
from PyQt6.QtWidgets import (
|
||||
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
|
||||
QGroupBox, QLabel, QPushButton, QSlider, QComboBox, QLineEdit,
|
||||
QPlainTextEdit, QRadioButton, QButtonGroup, QFrame, QSplitter,
|
||||
QSizePolicy, QSpacerItem, QSpinBox, QToolButton, QStatusBar,
|
||||
)
|
||||
except ImportError:
|
||||
print("ERROR: 'PyQt6' not installed. Run: pip install PyQt6")
|
||||
sys.exit(1)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Cover / calibrator state codes (ASCOM Alpaca)
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
COVER_LABELS = {
|
||||
0: ("NOT PRESENT", "#9e9e9e"),
|
||||
1: ("UNKNOWN", "#9e9e9e"),
|
||||
2: ("OPEN", "#42a5f5"), # blue – panel clear of telescope
|
||||
3: ("CLOSED", "#ff9800"), # orange – panel in flat position
|
||||
4: ("MOVING", "#ffeb3b"), # yellow
|
||||
5: ("ERROR", "#ef5350"),
|
||||
}
|
||||
|
||||
CAL_LABELS = {
|
||||
0: ("N/A", "#9e9e9e"),
|
||||
1: ("OFF", "#9e9e9e"),
|
||||
2: ("NOT READY", "#ffeb3b"),
|
||||
3: ("ON", "#4caf50"),
|
||||
4: ("UNKNOWN", "#9e9e9e"),
|
||||
5: ("ERROR", "#ef5350"),
|
||||
}
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Device state dataclass
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class DeviceState:
|
||||
__slots__ = ("cover", "calibrator", "brightness", "max_brightness", "dew")
|
||||
|
||||
def __init__(self):
|
||||
self.cover: int = 1 # CoverState.Unknown
|
||||
self.calibrator: int = 1 # CalibratorState.Off
|
||||
self.brightness: int = 0
|
||||
self.max_brightness: int = 255
|
||||
self.dew: int = 0
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Abstract backend protocol
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class Backend(ABC):
|
||||
@abstractmethod
|
||||
def connect(self) -> None: ...
|
||||
@abstractmethod
|
||||
def disconnect(self) -> None: ...
|
||||
@abstractmethod
|
||||
def get_state(self) -> DeviceState: ...
|
||||
@abstractmethod
|
||||
def calibrator_on(self, brightness: int) -> None: ...
|
||||
@abstractmethod
|
||||
def calibrator_off(self) -> None: ...
|
||||
@abstractmethod
|
||||
def open_cover(self) -> None: ...
|
||||
@abstractmethod
|
||||
def close_cover(self) -> None: ...
|
||||
@abstractmethod
|
||||
def halt_cover(self) -> None: ...
|
||||
@abstractmethod
|
||||
def set_dew(self, pct: int) -> None: ...
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# WiFi / ASCOM Alpaca backend
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class AlpacaBackend(Backend):
|
||||
_TIMEOUT = 4
|
||||
|
||||
def __init__(self, host: str, port: int):
|
||||
self._base = f"http://{host}:{port}/api/v1/covercalibrator/0"
|
||||
self._tx = 0
|
||||
self._lock = threading.Lock()
|
||||
self._session = requests.Session()
|
||||
|
||||
def _tx_id(self) -> int:
|
||||
with self._lock:
|
||||
self._tx += 1
|
||||
return self._tx
|
||||
|
||||
def _get(self, prop: str):
|
||||
url = f"{self._base}/{prop}?ClientID=1&ClientTransactionID={self._tx_id()}"
|
||||
r = self._session.get(url, timeout=self._TIMEOUT)
|
||||
r.raise_for_status()
|
||||
j = r.json()
|
||||
if j.get("ErrorNumber", -1) != 0:
|
||||
raise IOError(j.get("ErrorMessage", "Alpaca error"))
|
||||
return j.get("Value")
|
||||
|
||||
def _put(self, endpoint: str, **params):
|
||||
body = {"ClientID": 1, "ClientTransactionID": self._tx_id(), **params}
|
||||
url = f"{self._base}/{endpoint}"
|
||||
r = self._session.put(url, data=body, timeout=self._TIMEOUT)
|
||||
r.raise_for_status()
|
||||
j = r.json()
|
||||
if j.get("ErrorNumber", -1) != 0:
|
||||
raise IOError(j.get("ErrorMessage", "Alpaca error"))
|
||||
|
||||
def connect(self) -> None:
|
||||
self._get("interfaceversion")
|
||||
self._put("connected", Connected="true")
|
||||
|
||||
def disconnect(self) -> None:
|
||||
try:
|
||||
self._put("connected", Connected="false")
|
||||
except Exception:
|
||||
pass
|
||||
self._session.close()
|
||||
|
||||
def get_state(self) -> DeviceState:
|
||||
s = DeviceState()
|
||||
s.cover = int(self._get("coverstate"))
|
||||
s.calibrator = int(self._get("calibratorstate"))
|
||||
s.brightness = int(self._get("brightness"))
|
||||
s.max_brightness = int(self._get("maxbrightness"))
|
||||
try:
|
||||
s.dew = int(self._get("dewheater"))
|
||||
except Exception:
|
||||
s.dew = 0
|
||||
return s
|
||||
|
||||
def calibrator_on(self, brightness: int) -> None:
|
||||
self._put("calibratoron", Brightness=brightness)
|
||||
|
||||
def calibrator_off(self) -> None:
|
||||
self._put("calibratoroff")
|
||||
|
||||
def open_cover(self) -> None:
|
||||
self._put("opencover")
|
||||
|
||||
def close_cover(self) -> None:
|
||||
self._put("closecover")
|
||||
|
||||
def halt_cover(self) -> None:
|
||||
self._put("haltcover")
|
||||
|
||||
def set_dew(self, pct: int) -> None:
|
||||
self._put("dewheater", Percentage=pct)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# USB / Alnitak serial backend
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class AlnitakBackend(Backend):
|
||||
_BAUD = 9600
|
||||
_TIMEOUT = 2.5 # seconds per command
|
||||
|
||||
def __init__(self, port: str):
|
||||
self._port_name = port
|
||||
self._ser: Optional[serial.Serial] = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _send(self, cmd: str) -> str:
|
||||
"""Send an Alnitak command; return the stripped response line."""
|
||||
with self._lock:
|
||||
if self._ser is None or not self._ser.is_open:
|
||||
raise IOError("Serial port not open")
|
||||
self._ser.reset_input_buffer()
|
||||
self._ser.write((cmd + "\r").encode("ascii"))
|
||||
self._ser.flush()
|
||||
# Read until \r or timeout
|
||||
raw = b""
|
||||
while True:
|
||||
chunk = self._ser.read(64)
|
||||
if not chunk:
|
||||
break
|
||||
raw += chunk
|
||||
if b"\r" in raw:
|
||||
break
|
||||
text = raw.decode("ascii", errors="ignore").strip()
|
||||
# Return first line that starts with '*'
|
||||
for line in text.splitlines():
|
||||
if line.startswith("*"):
|
||||
return line
|
||||
return text
|
||||
|
||||
def _int_payload(self, resp: str) -> int:
|
||||
"""Extract the numeric payload from *CDDVVV (last 3+ chars)."""
|
||||
if len(resp) >= 5:
|
||||
try:
|
||||
return int(resp[4:])
|
||||
except ValueError:
|
||||
pass
|
||||
return 0
|
||||
|
||||
def connect(self) -> None:
|
||||
import time
|
||||
self._ser = serial.Serial(self._port_name, self._BAUD, timeout=self._TIMEOUT)
|
||||
time.sleep(1.5) # wait for Arduino CDC reset
|
||||
resp = self._send(">P000")
|
||||
if not resp.startswith("*P"):
|
||||
raise IOError(f"No valid Alnitak ping response: {resp!r}")
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if self._ser and self._ser.is_open:
|
||||
self._ser.close()
|
||||
self._ser = None
|
||||
|
||||
def get_state(self) -> DeviceState:
|
||||
s = DeviceState()
|
||||
s.max_brightness = 255
|
||||
|
||||
state_r = self._send(">S000")
|
||||
bright_r = self._send(">J000")
|
||||
dew_r = self._send(">U000")
|
||||
|
||||
# State: *SDD<motor1><light1>00
|
||||
if len(state_r) >= 7:
|
||||
try:
|
||||
motor = int(state_r[4])
|
||||
light = int(state_r[5])
|
||||
s.cover = {1: 3, 2: 2}.get(motor, 4) # Alnitak 1=Closed→3, 2=Open→2
|
||||
s.calibrator = 3 if light else 1
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
if bright_r.startswith("*J") and len(bright_r) >= 7:
|
||||
try:
|
||||
s.brightness = int(bright_r[4:7])
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if dew_r.startswith("*U") and len(dew_r) >= 7:
|
||||
try:
|
||||
s.dew = int(dew_r[4:7])
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return s
|
||||
|
||||
def calibrator_on(self, brightness: int) -> None:
|
||||
self._send(f">B{brightness:03d}")
|
||||
self._send(">L000")
|
||||
|
||||
def calibrator_off(self) -> None:
|
||||
self._send(">D000")
|
||||
|
||||
def open_cover(self) -> None:
|
||||
self._send(">O000")
|
||||
|
||||
def close_cover(self) -> None:
|
||||
self._send(">C000")
|
||||
|
||||
def halt_cover(self) -> None:
|
||||
self._send(">H000")
|
||||
|
||||
def set_dew(self, pct: int) -> None:
|
||||
self._send(f">T{pct:03d}")
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Background polling thread
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class PollWorker(QThread):
|
||||
state_ready = pyqtSignal(object) # DeviceState
|
||||
poll_error = pyqtSignal(str)
|
||||
|
||||
def __init__(self, backend: Backend, parent=None):
|
||||
super().__init__(parent)
|
||||
self._backend = backend
|
||||
self._stop_evt = threading.Event()
|
||||
|
||||
def run(self):
|
||||
while not self._stop_evt.is_set():
|
||||
try:
|
||||
state = self._backend.get_state()
|
||||
self.state_ready.emit(state)
|
||||
except Exception as exc:
|
||||
self.poll_error.emit(str(exc))
|
||||
self._stop_evt.wait(timeout=1.0)
|
||||
|
||||
def stop(self):
|
||||
self._stop_evt.set()
|
||||
self.wait(3000)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Reusable widgets
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class StatusDot(QLabel):
|
||||
"""A coloured ● label used as a status indicator."""
|
||||
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self.setFont(QFont("Segoe UI", 11))
|
||||
self.set_state(1) # unknown / grey
|
||||
|
||||
def set_state(self, cover_or_cal: int, table: dict = COVER_LABELS):
|
||||
text, colour = table.get(cover_or_cal, ("?", "#9e9e9e"))
|
||||
self.setText(
|
||||
f'<span style="color:{colour}; font-size:18px;">●</span>'
|
||||
f'<span style="margin-left:6px; color:#e0e0e0;"> {text}</span>'
|
||||
)
|
||||
|
||||
|
||||
class LinkedSliderSpin(QWidget):
|
||||
"""Horizontal slider and spinbox kept in sync."""
|
||||
value_committed = pyqtSignal(int) # emitted when user stops dragging
|
||||
|
||||
def __init__(self, lo: int, hi: int, step: int = 1, parent=None):
|
||||
super().__init__(parent)
|
||||
self._updating = False
|
||||
|
||||
self.slider = QSlider(Qt.Orientation.Horizontal)
|
||||
self.slider.setRange(lo, hi)
|
||||
self.slider.setSingleStep(step)
|
||||
self.slider.setPageStep(step * 10)
|
||||
|
||||
self.spin = QSpinBox()
|
||||
self.spin.setRange(lo, hi)
|
||||
self.spin.setSingleStep(step)
|
||||
self.spin.setFixedWidth(62)
|
||||
|
||||
lay = QHBoxLayout(self)
|
||||
lay.setContentsMargins(0, 0, 0, 0)
|
||||
lay.addWidget(self.slider, 1)
|
||||
lay.addWidget(self.spin)
|
||||
|
||||
self.slider.sliderReleased.connect(self._slider_released)
|
||||
self.slider.valueChanged.connect(self._slider_moved)
|
||||
self.spin.editingFinished.connect(self._spin_finished)
|
||||
|
||||
def _slider_moved(self, v: int):
|
||||
if not self._updating:
|
||||
self._updating = True
|
||||
self.spin.setValue(v)
|
||||
self._updating = False
|
||||
|
||||
def _slider_released(self):
|
||||
self.value_committed.emit(self.slider.value())
|
||||
|
||||
def _spin_finished(self):
|
||||
if not self._updating:
|
||||
self._updating = True
|
||||
self.slider.setValue(self.spin.value())
|
||||
self._updating = False
|
||||
self.value_committed.emit(self.spin.value())
|
||||
|
||||
def set_value_silent(self, v: int):
|
||||
self._updating = True
|
||||
self.slider.setValue(v)
|
||||
self.spin.setValue(v)
|
||||
self._updating = False
|
||||
|
||||
def value(self) -> int:
|
||||
return self.slider.value()
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Main window
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class MainWindow(QMainWindow):
|
||||
_log_signal = pyqtSignal(str)
|
||||
_state_signal = pyqtSignal(object)
|
||||
_err_signal = pyqtSignal(str)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.setWindowTitle("ESP32 FlatPanel Controller")
|
||||
self.setMinimumSize(740, 620)
|
||||
|
||||
self._backend: Optional[Backend] = None
|
||||
self._poller: Optional[PollWorker] = None
|
||||
self._pool = ThreadPoolExecutor(max_workers=1)
|
||||
self._settings = QSettings("FlatPanel", "Controller")
|
||||
self._last_brightness = 100
|
||||
self._last_dew = 0
|
||||
self._dew_on = False
|
||||
|
||||
self._log_signal.connect(self._append_log)
|
||||
self._state_signal.connect(self._apply_state)
|
||||
self._err_signal.connect(self._on_poll_error)
|
||||
|
||||
self._build_ui()
|
||||
self._load_settings()
|
||||
|
||||
# ── UI construction ──────────────────────────────────────────────────────
|
||||
|
||||
def _build_ui(self):
|
||||
root = QWidget()
|
||||
self.setCentralWidget(root)
|
||||
main_lay = QVBoxLayout(root)
|
||||
main_lay.setSpacing(8)
|
||||
main_lay.setContentsMargins(10, 10, 10, 10)
|
||||
|
||||
main_lay.addWidget(self._build_connection_panel())
|
||||
|
||||
ctrl_row = QHBoxLayout()
|
||||
ctrl_row.addWidget(self._build_cover_panel(), 1)
|
||||
ctrl_row.addWidget(self._build_flat_panel(), 1)
|
||||
ctrl_row.addWidget(self._build_dew_panel(), 1)
|
||||
main_lay.addLayout(ctrl_row)
|
||||
|
||||
main_lay.addWidget(self._build_log_panel(), 1)
|
||||
|
||||
self.status_bar = QStatusBar()
|
||||
self.setStatusBar(self.status_bar)
|
||||
self.status_bar.showMessage("Not connected")
|
||||
|
||||
def _build_connection_panel(self) -> QGroupBox:
|
||||
gb = QGroupBox("Connection")
|
||||
lay = QHBoxLayout(gb)
|
||||
|
||||
# Mode radio buttons
|
||||
mode_box = QVBoxLayout()
|
||||
self.rb_wifi = QRadioButton("WiFi (Alpaca)")
|
||||
self.rb_usb = QRadioButton("USB (Alnitak)")
|
||||
self.rb_wifi.setChecked(True)
|
||||
self._mode_group = QButtonGroup()
|
||||
self._mode_group.addButton(self.rb_wifi, 0)
|
||||
self._mode_group.addButton(self.rb_usb, 1)
|
||||
mode_box.addWidget(self.rb_wifi)
|
||||
mode_box.addWidget(self.rb_usb)
|
||||
lay.addLayout(mode_box)
|
||||
lay.addSpacing(10)
|
||||
|
||||
# WiFi settings
|
||||
self._wifi_widget = QWidget()
|
||||
wifi_lay = QHBoxLayout(self._wifi_widget)
|
||||
wifi_lay.setContentsMargins(0, 0, 0, 0)
|
||||
wifi_lay.addWidget(QLabel("Host:"))
|
||||
self.le_host = QLineEdit("esp32-flatpanel.local")
|
||||
self.le_host.setMinimumWidth(200)
|
||||
wifi_lay.addWidget(self.le_host)
|
||||
wifi_lay.addWidget(QLabel("Port:"))
|
||||
self.sb_port = QSpinBox()
|
||||
self.sb_port.setRange(1, 65535)
|
||||
self.sb_port.setValue(11111)
|
||||
self.sb_port.setFixedWidth(70)
|
||||
wifi_lay.addWidget(self.sb_port)
|
||||
lay.addWidget(self._wifi_widget)
|
||||
|
||||
# USB settings
|
||||
self._usb_widget = QWidget()
|
||||
usb_lay = QHBoxLayout(self._usb_widget)
|
||||
usb_lay.setContentsMargins(0, 0, 0, 0)
|
||||
usb_lay.addWidget(QLabel("Port:"))
|
||||
self.cb_serial = QComboBox()
|
||||
self.cb_serial.setMinimumWidth(180)
|
||||
self._refresh_ports()
|
||||
usb_lay.addWidget(self.cb_serial)
|
||||
btn_refresh = QToolButton()
|
||||
btn_refresh.setText("⟳")
|
||||
btn_refresh.setToolTip("Refresh port list")
|
||||
btn_refresh.clicked.connect(self._refresh_ports)
|
||||
usb_lay.addWidget(btn_refresh)
|
||||
lay.addWidget(self._usb_widget)
|
||||
self._usb_widget.hide()
|
||||
|
||||
lay.addStretch()
|
||||
|
||||
# Connect button + status
|
||||
self.btn_connect = QPushButton("Connect")
|
||||
self.btn_connect.setFixedWidth(100)
|
||||
self.btn_connect.clicked.connect(self._on_connect_toggle)
|
||||
lay.addWidget(self.btn_connect)
|
||||
|
||||
self.lbl_conn_status = StatusDot()
|
||||
self.lbl_conn_status.set_state(1)
|
||||
lay.addWidget(self.lbl_conn_status)
|
||||
|
||||
self.rb_wifi.toggled.connect(self._mode_changed)
|
||||
self.rb_usb.toggled.connect(self._mode_changed)
|
||||
return gb
|
||||
|
||||
def _build_cover_panel(self) -> QGroupBox:
|
||||
gb = QGroupBox("Cover")
|
||||
lay = QVBoxLayout(gb)
|
||||
|
||||
self.lbl_cover = StatusDot()
|
||||
self.lbl_cover.set_state(1, COVER_LABELS)
|
||||
lay.addWidget(self.lbl_cover, alignment=Qt.AlignmentFlag.AlignHCenter)
|
||||
|
||||
lay.addSpacing(8)
|
||||
|
||||
btn_row = QHBoxLayout()
|
||||
self.btn_open = self._cmd_btn("Open", "#1565c0", self._do_open_cover)
|
||||
self.btn_close = self._cmd_btn("Close", "#e65100", self._do_close_cover)
|
||||
btn_row.addWidget(self.btn_open)
|
||||
btn_row.addWidget(self.btn_close)
|
||||
lay.addLayout(btn_row)
|
||||
|
||||
self.btn_halt = self._cmd_btn("Halt", "#b71c1c", self._do_halt_cover)
|
||||
lay.addWidget(self.btn_halt)
|
||||
lay.addStretch()
|
||||
self._set_controls_enabled(False)
|
||||
return gb
|
||||
|
||||
def _build_flat_panel(self) -> QGroupBox:
|
||||
gb = QGroupBox("Flat Panel")
|
||||
lay = QVBoxLayout(gb)
|
||||
|
||||
self.lbl_cal = StatusDot()
|
||||
self.lbl_cal.set_state(1, CAL_LABELS)
|
||||
lay.addWidget(self.lbl_cal, alignment=Qt.AlignmentFlag.AlignHCenter)
|
||||
|
||||
lay.addSpacing(4)
|
||||
lay.addWidget(QLabel("Brightness (0 – 255):"))
|
||||
self.slider_bright = LinkedSliderSpin(0, 255, 1)
|
||||
self.slider_bright.set_value_silent(100)
|
||||
self.slider_bright.value_committed.connect(self._brightness_committed)
|
||||
lay.addWidget(self.slider_bright)
|
||||
|
||||
lay.addSpacing(4)
|
||||
btn_row = QHBoxLayout()
|
||||
self.btn_cal_on = self._cmd_btn("Light On", "#2e7d32", self._do_cal_on)
|
||||
self.btn_cal_off = self._cmd_btn("Light Off", "#555555", self._do_cal_off)
|
||||
btn_row.addWidget(self.btn_cal_on)
|
||||
btn_row.addWidget(self.btn_cal_off)
|
||||
lay.addLayout(btn_row)
|
||||
lay.addStretch()
|
||||
return gb
|
||||
|
||||
def _build_dew_panel(self) -> QGroupBox:
|
||||
gb = QGroupBox("Dew Heater (12 V)")
|
||||
lay = QVBoxLayout(gb)
|
||||
|
||||
self.lbl_dew = QLabel(
|
||||
'<span style="color:#9e9e9e; font-size:18px;">●</span>'
|
||||
'<span style="color:#e0e0e0;"> OFF 0 %</span>'
|
||||
)
|
||||
self.lbl_dew.setFont(QFont("Segoe UI", 11))
|
||||
lay.addWidget(self.lbl_dew, alignment=Qt.AlignmentFlag.AlignHCenter)
|
||||
|
||||
lay.addSpacing(4)
|
||||
lay.addWidget(QLabel("Power (0 – 100 %):"))
|
||||
self.slider_dew = LinkedSliderSpin(0, 100, 5)
|
||||
self.slider_dew.set_value_silent(0)
|
||||
self.slider_dew.value_committed.connect(self._dew_committed)
|
||||
lay.addWidget(self.slider_dew)
|
||||
|
||||
lay.addSpacing(4)
|
||||
btn_row = QHBoxLayout()
|
||||
self.btn_dew_on = self._cmd_btn("Heater On", "#bf360c", self._do_dew_on)
|
||||
self.btn_dew_off = self._cmd_btn("Heater Off", "#555555", self._do_dew_off)
|
||||
btn_row.addWidget(self.btn_dew_on)
|
||||
btn_row.addWidget(self.btn_dew_off)
|
||||
lay.addLayout(btn_row)
|
||||
lay.addStretch()
|
||||
return gb
|
||||
|
||||
def _build_log_panel(self) -> QGroupBox:
|
||||
gb = QGroupBox("Log")
|
||||
lay = QVBoxLayout(gb)
|
||||
self.log_view = QPlainTextEdit()
|
||||
self.log_view.setReadOnly(True)
|
||||
self.log_view.setMaximumBlockCount(500)
|
||||
self.log_view.setFont(QFont("Courier New", 9))
|
||||
lay.addWidget(self.log_view)
|
||||
|
||||
btn_clear = QPushButton("Clear")
|
||||
btn_clear.setFixedWidth(70)
|
||||
btn_clear.clicked.connect(self.log_view.clear)
|
||||
row = QHBoxLayout()
|
||||
row.addStretch()
|
||||
row.addWidget(btn_clear)
|
||||
lay.addLayout(row)
|
||||
return gb
|
||||
|
||||
@staticmethod
|
||||
def _cmd_btn(label: str, bg: str, slot) -> QPushButton:
|
||||
btn = QPushButton(label)
|
||||
btn.setStyleSheet(
|
||||
f"QPushButton {{ background-color:{bg}; color:white; "
|
||||
f"border-radius:4px; padding:5px 10px; }}"
|
||||
f"QPushButton:hover {{ background-color:{bg}dd; }}"
|
||||
f"QPushButton:disabled {{ background-color:#333; color:#666; }}"
|
||||
)
|
||||
btn.clicked.connect(slot)
|
||||
return btn
|
||||
|
||||
# ── Settings ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _load_settings(self):
|
||||
mode = self._settings.value("mode", "wifi")
|
||||
self.rb_usb.setChecked(mode == "usb")
|
||||
self.rb_wifi.setChecked(mode == "wifi")
|
||||
self.le_host.setText(self._settings.value("host", "esp32-flatpanel.local"))
|
||||
self.sb_port.setValue(int(self._settings.value("port", 11111)))
|
||||
saved_serial = self._settings.value("serial_port", "")
|
||||
idx = self.cb_serial.findText(saved_serial)
|
||||
if idx >= 0:
|
||||
self.cb_serial.setCurrentIndex(idx)
|
||||
self._last_brightness = int(self._settings.value("last_brightness", 100))
|
||||
self._last_dew = int(self._settings.value("last_dew", 0))
|
||||
self.slider_bright.set_value_silent(self._last_brightness)
|
||||
self.slider_dew.set_value_silent(self._last_dew)
|
||||
|
||||
def _save_settings(self):
|
||||
self._settings.setValue("mode", "usb" if self.rb_usb.isChecked() else "wifi")
|
||||
self._settings.setValue("host", self.le_host.text())
|
||||
self._settings.setValue("port", self.sb_port.value())
|
||||
self._settings.setValue("serial_port", self.cb_serial.currentText())
|
||||
self._settings.setValue("last_brightness", self.slider_bright.value())
|
||||
self._settings.setValue("last_dew", self.slider_dew.value())
|
||||
|
||||
# ── UI helpers ────────────────────────────────────────────────────────────
|
||||
|
||||
def _mode_changed(self):
|
||||
usb = self.rb_usb.isChecked()
|
||||
self._wifi_widget.setVisible(not usb)
|
||||
self._usb_widget.setVisible(usb)
|
||||
|
||||
def _refresh_ports(self):
|
||||
prev = self.cb_serial.currentText()
|
||||
self.cb_serial.clear()
|
||||
ports = [p.device for p in serial.tools.list_ports.comports()]
|
||||
self.cb_serial.addItems(ports if ports else ["(no ports found)"])
|
||||
idx = self.cb_serial.findText(prev)
|
||||
if idx >= 0:
|
||||
self.cb_serial.setCurrentIndex(idx)
|
||||
|
||||
def _set_controls_enabled(self, enabled: bool):
|
||||
for w in (self.btn_open, self.btn_close, self.btn_halt,
|
||||
self.btn_cal_on, self.btn_cal_off,
|
||||
self.btn_dew_on, self.btn_dew_off,
|
||||
self.slider_bright, self.slider_dew):
|
||||
w.setEnabled(enabled)
|
||||
|
||||
# ── Connection ────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_connect_toggle(self):
|
||||
if self._backend is not None:
|
||||
self._disconnect()
|
||||
else:
|
||||
self._connect()
|
||||
|
||||
def _connect(self):
|
||||
self.btn_connect.setEnabled(False)
|
||||
self.status_bar.showMessage("Connecting…")
|
||||
|
||||
def worker():
|
||||
try:
|
||||
if self.rb_usb.isChecked():
|
||||
b = AlnitakBackend(self.cb_serial.currentText())
|
||||
else:
|
||||
b = AlpacaBackend(self.le_host.text(), self.sb_port.value())
|
||||
b.connect()
|
||||
return b
|
||||
except Exception as exc:
|
||||
return exc
|
||||
|
||||
def done(fut):
|
||||
result = fut.result()
|
||||
if isinstance(result, Exception):
|
||||
self._log_signal.emit(f"Connection failed: {result}")
|
||||
self.status_bar.showMessage("Connection failed")
|
||||
self.btn_connect.setEnabled(True)
|
||||
return
|
||||
self._backend = result
|
||||
self._save_settings()
|
||||
self._poller = PollWorker(self._backend)
|
||||
self._poller.state_ready.connect(self._state_signal)
|
||||
self._poller.poll_error.connect(self._err_signal)
|
||||
self._poller.start()
|
||||
self.btn_connect.setText("Disconnect")
|
||||
self.btn_connect.setEnabled(True)
|
||||
self._set_controls_enabled(True)
|
||||
mode = "USB/Alnitak" if self.rb_usb.isChecked() else f"WiFi ({self.le_host.text()})"
|
||||
self._log_signal.emit(f"Connected via {mode}")
|
||||
self.status_bar.showMessage(f"Connected – {mode}")
|
||||
|
||||
self._pool.submit(worker).add_done_callback(
|
||||
lambda f: QTimer.singleShot(0, lambda: done(f))
|
||||
)
|
||||
|
||||
def _disconnect(self):
|
||||
if self._poller:
|
||||
self._poller.stop()
|
||||
self._poller = None
|
||||
if self._backend:
|
||||
try:
|
||||
self._backend.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
self._backend = None
|
||||
self.btn_connect.setText("Connect")
|
||||
self._set_controls_enabled(False)
|
||||
self.lbl_conn_status.set_state(1)
|
||||
self.lbl_cover.set_state(1, COVER_LABELS)
|
||||
self.lbl_cal.set_state(1, CAL_LABELS)
|
||||
self.status_bar.showMessage("Disconnected")
|
||||
self._log_signal.emit("Disconnected.")
|
||||
|
||||
# ── Commands ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _run(self, fn, *args, msg=""):
|
||||
"""Submit a backend command to the thread pool, log result."""
|
||||
if self._backend is None:
|
||||
return
|
||||
|
||||
def worker():
|
||||
try:
|
||||
fn(*args)
|
||||
if msg:
|
||||
self._log_signal.emit(msg)
|
||||
except Exception as e:
|
||||
self._log_signal.emit(f"ERROR: {e}")
|
||||
|
||||
self._pool.submit(worker)
|
||||
|
||||
def _do_open_cover(self):
|
||||
self._run(self._backend.open_cover, msg="→ Open cover")
|
||||
|
||||
def _do_close_cover(self):
|
||||
self._run(self._backend.close_cover, msg="→ Close cover")
|
||||
|
||||
def _do_halt_cover(self):
|
||||
self._run(self._backend.halt_cover, msg="→ Halt cover")
|
||||
|
||||
def _do_cal_on(self):
|
||||
b = self.slider_bright.value()
|
||||
self._last_brightness = b
|
||||
self._run(self._backend.calibrator_on, b, msg=f"→ Light ON brightness={b}")
|
||||
|
||||
def _do_cal_off(self):
|
||||
self._run(self._backend.calibrator_off, msg="→ Light OFF")
|
||||
|
||||
def _brightness_committed(self, v: int):
|
||||
self._last_brightness = v
|
||||
self._run(self._backend.calibrator_on, v, msg=f"→ Brightness set to {v}")
|
||||
|
||||
def _do_dew_on(self):
|
||||
pct = max(self._last_dew, self.slider_dew.value()) or 50
|
||||
self.slider_dew.set_value_silent(pct)
|
||||
self._dew_on = True
|
||||
self._run(self._backend.set_dew, pct, msg=f"→ Dew heater ON {pct}%")
|
||||
|
||||
def _do_dew_off(self):
|
||||
self._last_dew = self.slider_dew.value()
|
||||
self._dew_on = False
|
||||
self.slider_dew.set_value_silent(0)
|
||||
self._run(self._backend.set_dew, 0, msg="→ Dew heater OFF")
|
||||
|
||||
def _dew_committed(self, v: int):
|
||||
self._last_dew = v if v > 0 else self._last_dew
|
||||
self._run(self._backend.set_dew, v, msg=f"→ Dew heater {v}%")
|
||||
|
||||
# ── State update (called on GUI thread via signal) ────────────────────────
|
||||
|
||||
def _apply_state(self, state: DeviceState):
|
||||
self.lbl_conn_status.set_state(3, {3: ("CONNECTED", "#4caf50")})
|
||||
self.lbl_cover.set_state(state.cover, COVER_LABELS)
|
||||
self.lbl_cal.set_state(state.calibrator, CAL_LABELS)
|
||||
self.slider_bright.set_value_silent(state.brightness)
|
||||
|
||||
dew = state.dew
|
||||
colour = "#ff7043" if dew > 0 else "#9e9e9e"
|
||||
self.lbl_dew.setText(
|
||||
f'<span style="color:{colour}; font-size:18px;">●</span>'
|
||||
f'<span style="color:#e0e0e0;"> {"ON" if dew > 0 else "OFF"} {dew}%</span>'
|
||||
)
|
||||
self.slider_dew.set_value_silent(dew)
|
||||
|
||||
def _on_poll_error(self, msg: str):
|
||||
self.status_bar.showMessage(f"Poll error: {msg}")
|
||||
|
||||
# ── Log ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def _append_log(self, text: str):
|
||||
ts = datetime.now().strftime("%H:%M:%S")
|
||||
self.log_view.appendPlainText(f"[{ts}] {text}")
|
||||
|
||||
# ── Cleanup ───────────────────────────────────────────────────────────────
|
||||
|
||||
def closeEvent(self, event):
|
||||
self._save_settings()
|
||||
self._disconnect()
|
||||
self._pool.shutdown(wait=False)
|
||||
super().closeEvent(event)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Dark theme
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def apply_dark_theme(app: QApplication):
|
||||
app.setStyle("Fusion")
|
||||
pal = QPalette()
|
||||
C = QColor
|
||||
pal.setColor(QPalette.ColorRole.Window, C(26, 26, 26))
|
||||
pal.setColor(QPalette.ColorRole.WindowText, C(224, 224, 224))
|
||||
pal.setColor(QPalette.ColorRole.Base, C(38, 38, 38))
|
||||
pal.setColor(QPalette.ColorRole.AlternateBase, C(45, 45, 45))
|
||||
pal.setColor(QPalette.ColorRole.ToolTipBase, C(50, 50, 50))
|
||||
pal.setColor(QPalette.ColorRole.ToolTipText, C(220, 220, 220))
|
||||
pal.setColor(QPalette.ColorRole.Text, C(224, 224, 224))
|
||||
pal.setColor(QPalette.ColorRole.Button, C(50, 50, 50))
|
||||
pal.setColor(QPalette.ColorRole.ButtonText, C(224, 224, 224))
|
||||
pal.setColor(QPalette.ColorRole.BrightText, C(255, 255, 255))
|
||||
pal.setColor(QPalette.ColorRole.Link, C(66, 165, 245))
|
||||
pal.setColor(QPalette.ColorRole.Highlight, C(58, 122, 200))
|
||||
pal.setColor(QPalette.ColorRole.HighlightedText, C(255, 255, 255))
|
||||
pal.setColor(QPalette.ColorGroup.Disabled,
|
||||
QPalette.ColorRole.ButtonText, C(100, 100, 100))
|
||||
pal.setColor(QPalette.ColorGroup.Disabled,
|
||||
QPalette.ColorRole.Text, C(100, 100, 100))
|
||||
app.setPalette(pal)
|
||||
app.setStyleSheet("""
|
||||
QGroupBox {
|
||||
border: 1px solid #404040;
|
||||
border-radius: 6px;
|
||||
margin-top: 10px;
|
||||
padding-top: 6px;
|
||||
font-weight: bold;
|
||||
}
|
||||
QGroupBox::title {
|
||||
subcontrol-origin: margin;
|
||||
left: 10px;
|
||||
padding: 0 4px;
|
||||
color: #90caf9;
|
||||
}
|
||||
QSlider::groove:horizontal {
|
||||
height: 6px;
|
||||
background: #404040;
|
||||
border-radius: 3px;
|
||||
}
|
||||
QSlider::handle:horizontal {
|
||||
background: #3a7ac8;
|
||||
width: 16px;
|
||||
height: 16px;
|
||||
border-radius: 8px;
|
||||
margin: -5px 0;
|
||||
}
|
||||
QSlider::sub-page:horizontal {
|
||||
background: #3a7ac8;
|
||||
border-radius: 3px;
|
||||
}
|
||||
QLineEdit, QSpinBox, QComboBox, QPlainTextEdit {
|
||||
background: #2a2a2a;
|
||||
border: 1px solid #505050;
|
||||
border-radius: 4px;
|
||||
padding: 3px;
|
||||
}
|
||||
QPushButton {
|
||||
border-radius: 4px;
|
||||
padding: 5px 12px;
|
||||
}
|
||||
QPushButton:default {
|
||||
background-color: #3a7ac8;
|
||||
color: white;
|
||||
}
|
||||
""")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Entry point
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
app = QApplication(sys.argv)
|
||||
app.setApplicationName("ESP32 FlatPanel Controller")
|
||||
app.setOrganizationName("FlatPanel")
|
||||
apply_dark_theme(app)
|
||||
win = MainWindow()
|
||||
win.show()
|
||||
sys.exit(app.exec())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue