flatpanel/controller/flatpanel_controller.py
Laurence c32f00a2be Initial release v1.1.0 — ESP32 automated telescope flat panel
Firmware (Arduino Nano ESP32 / PlatformIO):
- Native ASCOM Alpaca CoverCalibrator REST API on port 11111
- Alnitak serial protocol on USB at 9600 baud (simultaneous with WiFi)
- MG995 servo cover mechanism with non-blocking state machine (D9)
- LED brightness PWM via IRLZ44N MOSFET, LEDC channel 0 (D3)
- 12V dew heater PWM via IRLZ44N MOSFET, LEDC channel 1 (D5)
- mDNS + UDP Alpaca discovery, WiFi watchdog reconnect
- SERIAL_DEBUG flag to silence debug output in USB-only mode

INDI driver (C++ / libcurl / nlohmann-json):
- WiFi mode: HTTP Alpaca via libcurl
- USB mode: Alnitak serial via POSIX termios
- LightBoxInterface + DustCapInterface + dew heater number property

Python controller (PyQt6):
- Dark-themed desktop app for direct manual control
- AlpacaBackend (requests) + AlnitakBackend (pyserial)
- PollWorker QThread; cover, brightness, dew heater panels
- QSettings persistence; auto serial port discovery

Docs:
- system-diagram.svg, wiring-diagram.svg (browser-renderable SVG)
- BUILD_NOTES.md with BOM, LM2596 calibration, power-on checklist
- WIRING.md quick-reference, README.md, CHANGELOG.md
2026-05-17 08:51:29 +01:00

909 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ESP32 FlatPanel Controller
===========================
Manual control application for the ESP32 telescope flat panel.
Supports:
WiFi mode ASCOM Alpaca REST API over HTTP
USB mode Alnitak serial protocol at 9600 baud
Requirements:
pip install PyQt6 requests pyserial
"""
from __future__ import annotations
import sys
import threading
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Optional
try:
import requests
except ImportError:
print("ERROR: 'requests' not installed. Run: pip install requests")
sys.exit(1)
try:
import serial
import serial.tools.list_ports
except ImportError:
print("ERROR: 'pyserial' not installed. Run: pip install pyserial")
sys.exit(1)
try:
from PyQt6.QtCore import (Qt, QTimer, QSettings, QThread, QSize,
pyqtSignal, QObject)
from PyQt6.QtGui import QColor, QPalette, QFont, QIcon, QPixmap
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QGroupBox, QLabel, QPushButton, QSlider, QComboBox, QLineEdit,
QPlainTextEdit, QRadioButton, QButtonGroup, QFrame, QSplitter,
QSizePolicy, QSpacerItem, QSpinBox, QToolButton, QStatusBar,
)
except ImportError:
print("ERROR: 'PyQt6' not installed. Run: pip install PyQt6")
sys.exit(1)
# ─────────────────────────────────────────────────────────────────────────────
# Cover / calibrator state codes (ASCOM Alpaca)
# ─────────────────────────────────────────────────────────────────────────────
COVER_LABELS = {
0: ("NOT PRESENT", "#9e9e9e"),
1: ("UNKNOWN", "#9e9e9e"),
2: ("OPEN", "#42a5f5"), # blue panel clear of telescope
3: ("CLOSED", "#ff9800"), # orange panel in flat position
4: ("MOVING", "#ffeb3b"), # yellow
5: ("ERROR", "#ef5350"),
}
CAL_LABELS = {
0: ("N/A", "#9e9e9e"),
1: ("OFF", "#9e9e9e"),
2: ("NOT READY", "#ffeb3b"),
3: ("ON", "#4caf50"),
4: ("UNKNOWN", "#9e9e9e"),
5: ("ERROR", "#ef5350"),
}
# ─────────────────────────────────────────────────────────────────────────────
# Device state dataclass
# ─────────────────────────────────────────────────────────────────────────────
class DeviceState:
__slots__ = ("cover", "calibrator", "brightness", "max_brightness", "dew")
def __init__(self):
self.cover: int = 1 # CoverState.Unknown
self.calibrator: int = 1 # CalibratorState.Off
self.brightness: int = 0
self.max_brightness: int = 255
self.dew: int = 0
# ─────────────────────────────────────────────────────────────────────────────
# Abstract backend protocol
# ─────────────────────────────────────────────────────────────────────────────
class Backend(ABC):
@abstractmethod
def connect(self) -> None: ...
@abstractmethod
def disconnect(self) -> None: ...
@abstractmethod
def get_state(self) -> DeviceState: ...
@abstractmethod
def calibrator_on(self, brightness: int) -> None: ...
@abstractmethod
def calibrator_off(self) -> None: ...
@abstractmethod
def open_cover(self) -> None: ...
@abstractmethod
def close_cover(self) -> None: ...
@abstractmethod
def halt_cover(self) -> None: ...
@abstractmethod
def set_dew(self, pct: int) -> None: ...
# ─────────────────────────────────────────────────────────────────────────────
# WiFi / ASCOM Alpaca backend
# ─────────────────────────────────────────────────────────────────────────────
class AlpacaBackend(Backend):
_TIMEOUT = 4
def __init__(self, host: str, port: int):
self._base = f"http://{host}:{port}/api/v1/covercalibrator/0"
self._tx = 0
self._lock = threading.Lock()
self._session = requests.Session()
def _tx_id(self) -> int:
with self._lock:
self._tx += 1
return self._tx
def _get(self, prop: str):
url = f"{self._base}/{prop}?ClientID=1&ClientTransactionID={self._tx_id()}"
r = self._session.get(url, timeout=self._TIMEOUT)
r.raise_for_status()
j = r.json()
if j.get("ErrorNumber", -1) != 0:
raise IOError(j.get("ErrorMessage", "Alpaca error"))
return j.get("Value")
def _put(self, endpoint: str, **params):
body = {"ClientID": 1, "ClientTransactionID": self._tx_id(), **params}
url = f"{self._base}/{endpoint}"
r = self._session.put(url, data=body, timeout=self._TIMEOUT)
r.raise_for_status()
j = r.json()
if j.get("ErrorNumber", -1) != 0:
raise IOError(j.get("ErrorMessage", "Alpaca error"))
def connect(self) -> None:
self._get("interfaceversion")
self._put("connected", Connected="true")
def disconnect(self) -> None:
try:
self._put("connected", Connected="false")
except Exception:
pass
self._session.close()
def get_state(self) -> DeviceState:
s = DeviceState()
s.cover = int(self._get("coverstate"))
s.calibrator = int(self._get("calibratorstate"))
s.brightness = int(self._get("brightness"))
s.max_brightness = int(self._get("maxbrightness"))
try:
s.dew = int(self._get("dewheater"))
except Exception:
s.dew = 0
return s
def calibrator_on(self, brightness: int) -> None:
self._put("calibratoron", Brightness=brightness)
def calibrator_off(self) -> None:
self._put("calibratoroff")
def open_cover(self) -> None:
self._put("opencover")
def close_cover(self) -> None:
self._put("closecover")
def halt_cover(self) -> None:
self._put("haltcover")
def set_dew(self, pct: int) -> None:
self._put("dewheater", Percentage=pct)
# ─────────────────────────────────────────────────────────────────────────────
# USB / Alnitak serial backend
# ─────────────────────────────────────────────────────────────────────────────
class AlnitakBackend(Backend):
_BAUD = 9600
_TIMEOUT = 2.5 # seconds per command
def __init__(self, port: str):
self._port_name = port
self._ser: Optional[serial.Serial] = None
self._lock = threading.Lock()
def _send(self, cmd: str) -> str:
"""Send an Alnitak command; return the stripped response line."""
with self._lock:
if self._ser is None or not self._ser.is_open:
raise IOError("Serial port not open")
self._ser.reset_input_buffer()
self._ser.write((cmd + "\r").encode("ascii"))
self._ser.flush()
# Read until \r or timeout
raw = b""
while True:
chunk = self._ser.read(64)
if not chunk:
break
raw += chunk
if b"\r" in raw:
break
text = raw.decode("ascii", errors="ignore").strip()
# Return first line that starts with '*'
for line in text.splitlines():
if line.startswith("*"):
return line
return text
def _int_payload(self, resp: str) -> int:
"""Extract the numeric payload from *CDDVVV (last 3+ chars)."""
if len(resp) >= 5:
try:
return int(resp[4:])
except ValueError:
pass
return 0
def connect(self) -> None:
import time
self._ser = serial.Serial(self._port_name, self._BAUD, timeout=self._TIMEOUT)
time.sleep(1.5) # wait for Arduino CDC reset
resp = self._send(">P000")
if not resp.startswith("*P"):
raise IOError(f"No valid Alnitak ping response: {resp!r}")
def disconnect(self) -> None:
if self._ser and self._ser.is_open:
self._ser.close()
self._ser = None
def get_state(self) -> DeviceState:
s = DeviceState()
s.max_brightness = 255
state_r = self._send(">S000")
bright_r = self._send(">J000")
dew_r = self._send(">U000")
# State: *SDD<motor1><light1>00
if len(state_r) >= 7:
try:
motor = int(state_r[4])
light = int(state_r[5])
s.cover = {1: 3, 2: 2}.get(motor, 4) # Alnitak 1=Closed→3, 2=Open→2
s.calibrator = 3 if light else 1
except (ValueError, IndexError):
pass
if bright_r.startswith("*J") and len(bright_r) >= 7:
try:
s.brightness = int(bright_r[4:7])
except ValueError:
pass
if dew_r.startswith("*U") and len(dew_r) >= 7:
try:
s.dew = int(dew_r[4:7])
except ValueError:
pass
return s
def calibrator_on(self, brightness: int) -> None:
self._send(f">B{brightness:03d}")
self._send(">L000")
def calibrator_off(self) -> None:
self._send(">D000")
def open_cover(self) -> None:
self._send(">O000")
def close_cover(self) -> None:
self._send(">C000")
def halt_cover(self) -> None:
self._send(">H000")
def set_dew(self, pct: int) -> None:
self._send(f">T{pct:03d}")
# ─────────────────────────────────────────────────────────────────────────────
# Background polling thread
# ─────────────────────────────────────────────────────────────────────────────
class PollWorker(QThread):
state_ready = pyqtSignal(object) # DeviceState
poll_error = pyqtSignal(str)
def __init__(self, backend: Backend, parent=None):
super().__init__(parent)
self._backend = backend
self._stop_evt = threading.Event()
def run(self):
while not self._stop_evt.is_set():
try:
state = self._backend.get_state()
self.state_ready.emit(state)
except Exception as exc:
self.poll_error.emit(str(exc))
self._stop_evt.wait(timeout=1.0)
def stop(self):
self._stop_evt.set()
self.wait(3000)
# ─────────────────────────────────────────────────────────────────────────────
# Reusable widgets
# ─────────────────────────────────────────────────────────────────────────────
class StatusDot(QLabel):
"""A coloured ● label used as a status indicator."""
def __init__(self, parent=None):
super().__init__(parent)
self.setFont(QFont("Segoe UI", 11))
self.set_state(1) # unknown / grey
def set_state(self, cover_or_cal: int, table: dict = COVER_LABELS):
text, colour = table.get(cover_or_cal, ("?", "#9e9e9e"))
self.setText(
f'<span style="color:{colour}; font-size:18px;">●</span>'
f'<span style="margin-left:6px; color:#e0e0e0;"> {text}</span>'
)
class LinkedSliderSpin(QWidget):
"""Horizontal slider and spinbox kept in sync."""
value_committed = pyqtSignal(int) # emitted when user stops dragging
def __init__(self, lo: int, hi: int, step: int = 1, parent=None):
super().__init__(parent)
self._updating = False
self.slider = QSlider(Qt.Orientation.Horizontal)
self.slider.setRange(lo, hi)
self.slider.setSingleStep(step)
self.slider.setPageStep(step * 10)
self.spin = QSpinBox()
self.spin.setRange(lo, hi)
self.spin.setSingleStep(step)
self.spin.setFixedWidth(62)
lay = QHBoxLayout(self)
lay.setContentsMargins(0, 0, 0, 0)
lay.addWidget(self.slider, 1)
lay.addWidget(self.spin)
self.slider.sliderReleased.connect(self._slider_released)
self.slider.valueChanged.connect(self._slider_moved)
self.spin.editingFinished.connect(self._spin_finished)
def _slider_moved(self, v: int):
if not self._updating:
self._updating = True
self.spin.setValue(v)
self._updating = False
def _slider_released(self):
self.value_committed.emit(self.slider.value())
def _spin_finished(self):
if not self._updating:
self._updating = True
self.slider.setValue(self.spin.value())
self._updating = False
self.value_committed.emit(self.spin.value())
def set_value_silent(self, v: int):
self._updating = True
self.slider.setValue(v)
self.spin.setValue(v)
self._updating = False
def value(self) -> int:
return self.slider.value()
# ─────────────────────────────────────────────────────────────────────────────
# Main window
# ─────────────────────────────────────────────────────────────────────────────
class MainWindow(QMainWindow):
_log_signal = pyqtSignal(str)
_state_signal = pyqtSignal(object)
_err_signal = pyqtSignal(str)
def __init__(self):
super().__init__()
self.setWindowTitle("ESP32 FlatPanel Controller")
self.setMinimumSize(740, 620)
self._backend: Optional[Backend] = None
self._poller: Optional[PollWorker] = None
self._pool = ThreadPoolExecutor(max_workers=1)
self._settings = QSettings("FlatPanel", "Controller")
self._last_brightness = 100
self._last_dew = 0
self._dew_on = False
self._log_signal.connect(self._append_log)
self._state_signal.connect(self._apply_state)
self._err_signal.connect(self._on_poll_error)
self._build_ui()
self._load_settings()
# ── UI construction ──────────────────────────────────────────────────────
def _build_ui(self):
root = QWidget()
self.setCentralWidget(root)
main_lay = QVBoxLayout(root)
main_lay.setSpacing(8)
main_lay.setContentsMargins(10, 10, 10, 10)
main_lay.addWidget(self._build_connection_panel())
ctrl_row = QHBoxLayout()
ctrl_row.addWidget(self._build_cover_panel(), 1)
ctrl_row.addWidget(self._build_flat_panel(), 1)
ctrl_row.addWidget(self._build_dew_panel(), 1)
main_lay.addLayout(ctrl_row)
main_lay.addWidget(self._build_log_panel(), 1)
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.showMessage("Not connected")
def _build_connection_panel(self) -> QGroupBox:
gb = QGroupBox("Connection")
lay = QHBoxLayout(gb)
# Mode radio buttons
mode_box = QVBoxLayout()
self.rb_wifi = QRadioButton("WiFi (Alpaca)")
self.rb_usb = QRadioButton("USB (Alnitak)")
self.rb_wifi.setChecked(True)
self._mode_group = QButtonGroup()
self._mode_group.addButton(self.rb_wifi, 0)
self._mode_group.addButton(self.rb_usb, 1)
mode_box.addWidget(self.rb_wifi)
mode_box.addWidget(self.rb_usb)
lay.addLayout(mode_box)
lay.addSpacing(10)
# WiFi settings
self._wifi_widget = QWidget()
wifi_lay = QHBoxLayout(self._wifi_widget)
wifi_lay.setContentsMargins(0, 0, 0, 0)
wifi_lay.addWidget(QLabel("Host:"))
self.le_host = QLineEdit("esp32-flatpanel.local")
self.le_host.setMinimumWidth(200)
wifi_lay.addWidget(self.le_host)
wifi_lay.addWidget(QLabel("Port:"))
self.sb_port = QSpinBox()
self.sb_port.setRange(1, 65535)
self.sb_port.setValue(11111)
self.sb_port.setFixedWidth(70)
wifi_lay.addWidget(self.sb_port)
lay.addWidget(self._wifi_widget)
# USB settings
self._usb_widget = QWidget()
usb_lay = QHBoxLayout(self._usb_widget)
usb_lay.setContentsMargins(0, 0, 0, 0)
usb_lay.addWidget(QLabel("Port:"))
self.cb_serial = QComboBox()
self.cb_serial.setMinimumWidth(180)
self._refresh_ports()
usb_lay.addWidget(self.cb_serial)
btn_refresh = QToolButton()
btn_refresh.setText("")
btn_refresh.setToolTip("Refresh port list")
btn_refresh.clicked.connect(self._refresh_ports)
usb_lay.addWidget(btn_refresh)
lay.addWidget(self._usb_widget)
self._usb_widget.hide()
lay.addStretch()
# Connect button + status
self.btn_connect = QPushButton("Connect")
self.btn_connect.setFixedWidth(100)
self.btn_connect.clicked.connect(self._on_connect_toggle)
lay.addWidget(self.btn_connect)
self.lbl_conn_status = StatusDot()
self.lbl_conn_status.set_state(1)
lay.addWidget(self.lbl_conn_status)
self.rb_wifi.toggled.connect(self._mode_changed)
self.rb_usb.toggled.connect(self._mode_changed)
return gb
def _build_cover_panel(self) -> QGroupBox:
gb = QGroupBox("Cover")
lay = QVBoxLayout(gb)
self.lbl_cover = StatusDot()
self.lbl_cover.set_state(1, COVER_LABELS)
lay.addWidget(self.lbl_cover, alignment=Qt.AlignmentFlag.AlignHCenter)
lay.addSpacing(8)
btn_row = QHBoxLayout()
self.btn_open = self._cmd_btn("Open", "#1565c0", self._do_open_cover)
self.btn_close = self._cmd_btn("Close", "#e65100", self._do_close_cover)
btn_row.addWidget(self.btn_open)
btn_row.addWidget(self.btn_close)
lay.addLayout(btn_row)
self.btn_halt = self._cmd_btn("Halt", "#b71c1c", self._do_halt_cover)
lay.addWidget(self.btn_halt)
lay.addStretch()
self._set_controls_enabled(False)
return gb
def _build_flat_panel(self) -> QGroupBox:
gb = QGroupBox("Flat Panel")
lay = QVBoxLayout(gb)
self.lbl_cal = StatusDot()
self.lbl_cal.set_state(1, CAL_LABELS)
lay.addWidget(self.lbl_cal, alignment=Qt.AlignmentFlag.AlignHCenter)
lay.addSpacing(4)
lay.addWidget(QLabel("Brightness (0 255):"))
self.slider_bright = LinkedSliderSpin(0, 255, 1)
self.slider_bright.set_value_silent(100)
self.slider_bright.value_committed.connect(self._brightness_committed)
lay.addWidget(self.slider_bright)
lay.addSpacing(4)
btn_row = QHBoxLayout()
self.btn_cal_on = self._cmd_btn("Light On", "#2e7d32", self._do_cal_on)
self.btn_cal_off = self._cmd_btn("Light Off", "#555555", self._do_cal_off)
btn_row.addWidget(self.btn_cal_on)
btn_row.addWidget(self.btn_cal_off)
lay.addLayout(btn_row)
lay.addStretch()
return gb
def _build_dew_panel(self) -> QGroupBox:
gb = QGroupBox("Dew Heater (12 V)")
lay = QVBoxLayout(gb)
self.lbl_dew = QLabel(
'<span style="color:#9e9e9e; font-size:18px;">●</span>'
'<span style="color:#e0e0e0;"> OFF 0 %</span>'
)
self.lbl_dew.setFont(QFont("Segoe UI", 11))
lay.addWidget(self.lbl_dew, alignment=Qt.AlignmentFlag.AlignHCenter)
lay.addSpacing(4)
lay.addWidget(QLabel("Power (0 100 %):"))
self.slider_dew = LinkedSliderSpin(0, 100, 5)
self.slider_dew.set_value_silent(0)
self.slider_dew.value_committed.connect(self._dew_committed)
lay.addWidget(self.slider_dew)
lay.addSpacing(4)
btn_row = QHBoxLayout()
self.btn_dew_on = self._cmd_btn("Heater On", "#bf360c", self._do_dew_on)
self.btn_dew_off = self._cmd_btn("Heater Off", "#555555", self._do_dew_off)
btn_row.addWidget(self.btn_dew_on)
btn_row.addWidget(self.btn_dew_off)
lay.addLayout(btn_row)
lay.addStretch()
return gb
def _build_log_panel(self) -> QGroupBox:
gb = QGroupBox("Log")
lay = QVBoxLayout(gb)
self.log_view = QPlainTextEdit()
self.log_view.setReadOnly(True)
self.log_view.setMaximumBlockCount(500)
self.log_view.setFont(QFont("Courier New", 9))
lay.addWidget(self.log_view)
btn_clear = QPushButton("Clear")
btn_clear.setFixedWidth(70)
btn_clear.clicked.connect(self.log_view.clear)
row = QHBoxLayout()
row.addStretch()
row.addWidget(btn_clear)
lay.addLayout(row)
return gb
@staticmethod
def _cmd_btn(label: str, bg: str, slot) -> QPushButton:
btn = QPushButton(label)
btn.setStyleSheet(
f"QPushButton {{ background-color:{bg}; color:white; "
f"border-radius:4px; padding:5px 10px; }}"
f"QPushButton:hover {{ background-color:{bg}dd; }}"
f"QPushButton:disabled {{ background-color:#333; color:#666; }}"
)
btn.clicked.connect(slot)
return btn
# ── Settings ─────────────────────────────────────────────────────────────
def _load_settings(self):
mode = self._settings.value("mode", "wifi")
self.rb_usb.setChecked(mode == "usb")
self.rb_wifi.setChecked(mode == "wifi")
self.le_host.setText(self._settings.value("host", "esp32-flatpanel.local"))
self.sb_port.setValue(int(self._settings.value("port", 11111)))
saved_serial = self._settings.value("serial_port", "")
idx = self.cb_serial.findText(saved_serial)
if idx >= 0:
self.cb_serial.setCurrentIndex(idx)
self._last_brightness = int(self._settings.value("last_brightness", 100))
self._last_dew = int(self._settings.value("last_dew", 0))
self.slider_bright.set_value_silent(self._last_brightness)
self.slider_dew.set_value_silent(self._last_dew)
def _save_settings(self):
self._settings.setValue("mode", "usb" if self.rb_usb.isChecked() else "wifi")
self._settings.setValue("host", self.le_host.text())
self._settings.setValue("port", self.sb_port.value())
self._settings.setValue("serial_port", self.cb_serial.currentText())
self._settings.setValue("last_brightness", self.slider_bright.value())
self._settings.setValue("last_dew", self.slider_dew.value())
# ── UI helpers ────────────────────────────────────────────────────────────
def _mode_changed(self):
usb = self.rb_usb.isChecked()
self._wifi_widget.setVisible(not usb)
self._usb_widget.setVisible(usb)
def _refresh_ports(self):
prev = self.cb_serial.currentText()
self.cb_serial.clear()
ports = [p.device for p in serial.tools.list_ports.comports()]
self.cb_serial.addItems(ports if ports else ["(no ports found)"])
idx = self.cb_serial.findText(prev)
if idx >= 0:
self.cb_serial.setCurrentIndex(idx)
def _set_controls_enabled(self, enabled: bool):
for w in (self.btn_open, self.btn_close, self.btn_halt,
self.btn_cal_on, self.btn_cal_off,
self.btn_dew_on, self.btn_dew_off,
self.slider_bright, self.slider_dew):
w.setEnabled(enabled)
# ── Connection ────────────────────────────────────────────────────────────
def _on_connect_toggle(self):
if self._backend is not None:
self._disconnect()
else:
self._connect()
def _connect(self):
self.btn_connect.setEnabled(False)
self.status_bar.showMessage("Connecting…")
def worker():
try:
if self.rb_usb.isChecked():
b = AlnitakBackend(self.cb_serial.currentText())
else:
b = AlpacaBackend(self.le_host.text(), self.sb_port.value())
b.connect()
return b
except Exception as exc:
return exc
def done(fut):
result = fut.result()
if isinstance(result, Exception):
self._log_signal.emit(f"Connection failed: {result}")
self.status_bar.showMessage("Connection failed")
self.btn_connect.setEnabled(True)
return
self._backend = result
self._save_settings()
self._poller = PollWorker(self._backend)
self._poller.state_ready.connect(self._state_signal)
self._poller.poll_error.connect(self._err_signal)
self._poller.start()
self.btn_connect.setText("Disconnect")
self.btn_connect.setEnabled(True)
self._set_controls_enabled(True)
mode = "USB/Alnitak" if self.rb_usb.isChecked() else f"WiFi ({self.le_host.text()})"
self._log_signal.emit(f"Connected via {mode}")
self.status_bar.showMessage(f"Connected {mode}")
self._pool.submit(worker).add_done_callback(
lambda f: QTimer.singleShot(0, lambda: done(f))
)
def _disconnect(self):
if self._poller:
self._poller.stop()
self._poller = None
if self._backend:
try:
self._backend.disconnect()
except Exception:
pass
self._backend = None
self.btn_connect.setText("Connect")
self._set_controls_enabled(False)
self.lbl_conn_status.set_state(1)
self.lbl_cover.set_state(1, COVER_LABELS)
self.lbl_cal.set_state(1, CAL_LABELS)
self.status_bar.showMessage("Disconnected")
self._log_signal.emit("Disconnected.")
# ── Commands ──────────────────────────────────────────────────────────────
def _run(self, fn, *args, msg=""):
"""Submit a backend command to the thread pool, log result."""
if self._backend is None:
return
def worker():
try:
fn(*args)
if msg:
self._log_signal.emit(msg)
except Exception as e:
self._log_signal.emit(f"ERROR: {e}")
self._pool.submit(worker)
def _do_open_cover(self):
self._run(self._backend.open_cover, msg="→ Open cover")
def _do_close_cover(self):
self._run(self._backend.close_cover, msg="→ Close cover")
def _do_halt_cover(self):
self._run(self._backend.halt_cover, msg="→ Halt cover")
def _do_cal_on(self):
b = self.slider_bright.value()
self._last_brightness = b
self._run(self._backend.calibrator_on, b, msg=f"→ Light ON brightness={b}")
def _do_cal_off(self):
self._run(self._backend.calibrator_off, msg="→ Light OFF")
def _brightness_committed(self, v: int):
self._last_brightness = v
self._run(self._backend.calibrator_on, v, msg=f"→ Brightness set to {v}")
def _do_dew_on(self):
pct = max(self._last_dew, self.slider_dew.value()) or 50
self.slider_dew.set_value_silent(pct)
self._dew_on = True
self._run(self._backend.set_dew, pct, msg=f"→ Dew heater ON {pct}%")
def _do_dew_off(self):
self._last_dew = self.slider_dew.value()
self._dew_on = False
self.slider_dew.set_value_silent(0)
self._run(self._backend.set_dew, 0, msg="→ Dew heater OFF")
def _dew_committed(self, v: int):
self._last_dew = v if v > 0 else self._last_dew
self._run(self._backend.set_dew, v, msg=f"→ Dew heater {v}%")
# ── State update (called on GUI thread via signal) ────────────────────────
def _apply_state(self, state: DeviceState):
self.lbl_conn_status.set_state(3, {3: ("CONNECTED", "#4caf50")})
self.lbl_cover.set_state(state.cover, COVER_LABELS)
self.lbl_cal.set_state(state.calibrator, CAL_LABELS)
self.slider_bright.set_value_silent(state.brightness)
dew = state.dew
colour = "#ff7043" if dew > 0 else "#9e9e9e"
self.lbl_dew.setText(
f'<span style="color:{colour}; font-size:18px;">●</span>'
f'<span style="color:#e0e0e0;"> {"ON" if dew > 0 else "OFF"} {dew}%</span>'
)
self.slider_dew.set_value_silent(dew)
def _on_poll_error(self, msg: str):
self.status_bar.showMessage(f"Poll error: {msg}")
# ── Log ──────────────────────────────────────────────────────────────────
def _append_log(self, text: str):
ts = datetime.now().strftime("%H:%M:%S")
self.log_view.appendPlainText(f"[{ts}] {text}")
# ── Cleanup ───────────────────────────────────────────────────────────────
def closeEvent(self, event):
self._save_settings()
self._disconnect()
self._pool.shutdown(wait=False)
super().closeEvent(event)
# ─────────────────────────────────────────────────────────────────────────────
# Dark theme
# ─────────────────────────────────────────────────────────────────────────────
def apply_dark_theme(app: QApplication):
app.setStyle("Fusion")
pal = QPalette()
C = QColor
pal.setColor(QPalette.ColorRole.Window, C(26, 26, 26))
pal.setColor(QPalette.ColorRole.WindowText, C(224, 224, 224))
pal.setColor(QPalette.ColorRole.Base, C(38, 38, 38))
pal.setColor(QPalette.ColorRole.AlternateBase, C(45, 45, 45))
pal.setColor(QPalette.ColorRole.ToolTipBase, C(50, 50, 50))
pal.setColor(QPalette.ColorRole.ToolTipText, C(220, 220, 220))
pal.setColor(QPalette.ColorRole.Text, C(224, 224, 224))
pal.setColor(QPalette.ColorRole.Button, C(50, 50, 50))
pal.setColor(QPalette.ColorRole.ButtonText, C(224, 224, 224))
pal.setColor(QPalette.ColorRole.BrightText, C(255, 255, 255))
pal.setColor(QPalette.ColorRole.Link, C(66, 165, 245))
pal.setColor(QPalette.ColorRole.Highlight, C(58, 122, 200))
pal.setColor(QPalette.ColorRole.HighlightedText, C(255, 255, 255))
pal.setColor(QPalette.ColorGroup.Disabled,
QPalette.ColorRole.ButtonText, C(100, 100, 100))
pal.setColor(QPalette.ColorGroup.Disabled,
QPalette.ColorRole.Text, C(100, 100, 100))
app.setPalette(pal)
app.setStyleSheet("""
QGroupBox {
border: 1px solid #404040;
border-radius: 6px;
margin-top: 10px;
padding-top: 6px;
font-weight: bold;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 4px;
color: #90caf9;
}
QSlider::groove:horizontal {
height: 6px;
background: #404040;
border-radius: 3px;
}
QSlider::handle:horizontal {
background: #3a7ac8;
width: 16px;
height: 16px;
border-radius: 8px;
margin: -5px 0;
}
QSlider::sub-page:horizontal {
background: #3a7ac8;
border-radius: 3px;
}
QLineEdit, QSpinBox, QComboBox, QPlainTextEdit {
background: #2a2a2a;
border: 1px solid #505050;
border-radius: 4px;
padding: 3px;
}
QPushButton {
border-radius: 4px;
padding: 5px 12px;
}
QPushButton:default {
background-color: #3a7ac8;
color: white;
}
""")
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
def main():
app = QApplication(sys.argv)
app.setApplicationName("ESP32 FlatPanel Controller")
app.setOrganizationName("FlatPanel")
apply_dark_theme(app)
win = MainWindow()
win.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()