#!/usr/bin/env python3 """ ESP32 FlatPanel Controller =========================== Manual control application for the ESP32 telescope flat panel. Supports: WiFi mode – ASCOM Alpaca REST API over HTTP USB mode – Alnitak serial protocol at 9600 baud Requirements: pip install PyQt6 requests pyserial """ from __future__ import annotations import sys import threading from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import Optional try: import requests except ImportError: print("ERROR: 'requests' not installed. Run: pip install requests") sys.exit(1) try: import serial import serial.tools.list_ports except ImportError: print("ERROR: 'pyserial' not installed. Run: pip install pyserial") sys.exit(1) try: from PyQt6.QtCore import (Qt, QTimer, QSettings, QThread, QSize, pyqtSignal, QObject) from PyQt6.QtGui import QColor, QPalette, QFont, QIcon, QPixmap from PyQt6.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGroupBox, QLabel, QPushButton, QSlider, QComboBox, QLineEdit, QPlainTextEdit, QRadioButton, QButtonGroup, QFrame, QSplitter, QSizePolicy, QSpacerItem, QSpinBox, QToolButton, QStatusBar, ) except ImportError: print("ERROR: 'PyQt6' not installed. Run: pip install PyQt6") sys.exit(1) # ───────────────────────────────────────────────────────────────────────────── # Cover / calibrator state codes (ASCOM Alpaca) # ───────────────────────────────────────────────────────────────────────────── COVER_LABELS = { 0: ("NOT PRESENT", "#9e9e9e"), 1: ("UNKNOWN", "#9e9e9e"), 2: ("OPEN", "#42a5f5"), # blue – panel clear of telescope 3: ("CLOSED", "#ff9800"), # orange – panel in flat position 4: ("MOVING", "#ffeb3b"), # yellow 5: ("ERROR", "#ef5350"), } CAL_LABELS = { 0: ("N/A", "#9e9e9e"), 1: ("OFF", "#9e9e9e"), 2: ("NOT READY", "#ffeb3b"), 3: ("ON", "#4caf50"), 4: ("UNKNOWN", "#9e9e9e"), 5: ("ERROR", "#ef5350"), } # ───────────────────────────────────────────────────────────────────────────── # Device state dataclass # ───────────────────────────────────────────────────────────────────────────── class DeviceState: __slots__ = ("cover", "calibrator", "brightness", "max_brightness", "dew") def __init__(self): self.cover: int = 1 # CoverState.Unknown self.calibrator: int = 1 # CalibratorState.Off self.brightness: int = 0 self.max_brightness: int = 255 self.dew: int = 0 # ───────────────────────────────────────────────────────────────────────────── # Abstract backend protocol # ───────────────────────────────────────────────────────────────────────────── class Backend(ABC): @abstractmethod def connect(self) -> None: ... @abstractmethod def disconnect(self) -> None: ... @abstractmethod def get_state(self) -> DeviceState: ... @abstractmethod def calibrator_on(self, brightness: int) -> None: ... @abstractmethod def calibrator_off(self) -> None: ... @abstractmethod def open_cover(self) -> None: ... @abstractmethod def close_cover(self) -> None: ... @abstractmethod def halt_cover(self) -> None: ... @abstractmethod def set_dew(self, pct: int) -> None: ... # ───────────────────────────────────────────────────────────────────────────── # WiFi / ASCOM Alpaca backend # ───────────────────────────────────────────────────────────────────────────── class AlpacaBackend(Backend): _TIMEOUT = 4 def __init__(self, host: str, port: int): self._base = f"http://{host}:{port}/api/v1/covercalibrator/0" self._tx = 0 self._lock = threading.Lock() self._session = requests.Session() def _tx_id(self) -> int: with self._lock: self._tx += 1 return self._tx def _get(self, prop: str): url = f"{self._base}/{prop}?ClientID=1&ClientTransactionID={self._tx_id()}" r = self._session.get(url, timeout=self._TIMEOUT) r.raise_for_status() j = r.json() if j.get("ErrorNumber", -1) != 0: raise IOError(j.get("ErrorMessage", "Alpaca error")) return j.get("Value") def _put(self, endpoint: str, **params): body = {"ClientID": 1, "ClientTransactionID": self._tx_id(), **params} url = f"{self._base}/{endpoint}" r = self._session.put(url, data=body, timeout=self._TIMEOUT) r.raise_for_status() j = r.json() if j.get("ErrorNumber", -1) != 0: raise IOError(j.get("ErrorMessage", "Alpaca error")) def connect(self) -> None: self._get("interfaceversion") self._put("connected", Connected="true") def disconnect(self) -> None: try: self._put("connected", Connected="false") except Exception: pass self._session.close() def get_state(self) -> DeviceState: s = DeviceState() s.cover = int(self._get("coverstate")) s.calibrator = int(self._get("calibratorstate")) s.brightness = int(self._get("brightness")) s.max_brightness = int(self._get("maxbrightness")) try: s.dew = int(self._get("dewheater")) except Exception: s.dew = 0 return s def calibrator_on(self, brightness: int) -> None: self._put("calibratoron", Brightness=brightness) def calibrator_off(self) -> None: self._put("calibratoroff") def open_cover(self) -> None: self._put("opencover") def close_cover(self) -> None: self._put("closecover") def halt_cover(self) -> None: self._put("haltcover") def set_dew(self, pct: int) -> None: self._put("dewheater", Percentage=pct) # ───────────────────────────────────────────────────────────────────────────── # USB / Alnitak serial backend # ───────────────────────────────────────────────────────────────────────────── class AlnitakBackend(Backend): _BAUD = 9600 _TIMEOUT = 2.5 # seconds per command def __init__(self, port: str): self._port_name = port self._ser: Optional[serial.Serial] = None self._lock = threading.Lock() def _send(self, cmd: str) -> str: """Send an Alnitak command; return the stripped response line.""" with self._lock: if self._ser is None or not self._ser.is_open: raise IOError("Serial port not open") self._ser.reset_input_buffer() self._ser.write((cmd + "\r").encode("ascii")) self._ser.flush() # Read until \r or timeout raw = b"" while True: chunk = self._ser.read(64) if not chunk: break raw += chunk if b"\r" in raw: break text = raw.decode("ascii", errors="ignore").strip() # Return first line that starts with '*' for line in text.splitlines(): if line.startswith("*"): return line return text def _int_payload(self, resp: str) -> int: """Extract the numeric payload from *CDDVVV (last 3+ chars).""" if len(resp) >= 5: try: return int(resp[4:]) except ValueError: pass return 0 def connect(self) -> None: import time self._ser = serial.Serial(self._port_name, self._BAUD, timeout=self._TIMEOUT) time.sleep(1.5) # wait for Arduino CDC reset resp = self._send(">P000") if not resp.startswith("*P"): raise IOError(f"No valid Alnitak ping response: {resp!r}") def disconnect(self) -> None: if self._ser and self._ser.is_open: self._ser.close() self._ser = None def get_state(self) -> DeviceState: s = DeviceState() s.max_brightness = 255 state_r = self._send(">S000") bright_r = self._send(">J000") dew_r = self._send(">U000") # State: *SDD00 if len(state_r) >= 7: try: motor = int(state_r[4]) light = int(state_r[5]) s.cover = {1: 3, 2: 2}.get(motor, 4) # Alnitak 1=Closed→3, 2=Open→2 s.calibrator = 3 if light else 1 except (ValueError, IndexError): pass if bright_r.startswith("*J") and len(bright_r) >= 7: try: s.brightness = int(bright_r[4:7]) except ValueError: pass if dew_r.startswith("*U") and len(dew_r) >= 7: try: s.dew = int(dew_r[4:7]) except ValueError: pass return s def calibrator_on(self, brightness: int) -> None: self._send(f">B{brightness:03d}") self._send(">L000") def calibrator_off(self) -> None: self._send(">D000") def open_cover(self) -> None: self._send(">O000") def close_cover(self) -> None: self._send(">C000") def halt_cover(self) -> None: self._send(">H000") def set_dew(self, pct: int) -> None: self._send(f">T{pct:03d}") # ───────────────────────────────────────────────────────────────────────────── # Background polling thread # ───────────────────────────────────────────────────────────────────────────── class PollWorker(QThread): state_ready = pyqtSignal(object) # DeviceState poll_error = pyqtSignal(str) def __init__(self, backend: Backend, parent=None): super().__init__(parent) self._backend = backend self._stop_evt = threading.Event() def run(self): while not self._stop_evt.is_set(): try: state = self._backend.get_state() self.state_ready.emit(state) except Exception as exc: self.poll_error.emit(str(exc)) self._stop_evt.wait(timeout=1.0) def stop(self): self._stop_evt.set() self.wait(3000) # ───────────────────────────────────────────────────────────────────────────── # Reusable widgets # ───────────────────────────────────────────────────────────────────────────── class StatusDot(QLabel): """A coloured ● label used as a status indicator.""" def __init__(self, parent=None): super().__init__(parent) self.setFont(QFont("Segoe UI", 11)) self.set_state(1) # unknown / grey def set_state(self, cover_or_cal: int, table: dict = COVER_LABELS): text, colour = table.get(cover_or_cal, ("?", "#9e9e9e")) self.setText( f'' f' {text}' ) class LinkedSliderSpin(QWidget): """Horizontal slider and spinbox kept in sync.""" value_committed = pyqtSignal(int) # emitted when user stops dragging def __init__(self, lo: int, hi: int, step: int = 1, parent=None): super().__init__(parent) self._updating = False self.slider = QSlider(Qt.Orientation.Horizontal) self.slider.setRange(lo, hi) self.slider.setSingleStep(step) self.slider.setPageStep(step * 10) self.spin = QSpinBox() self.spin.setRange(lo, hi) self.spin.setSingleStep(step) self.spin.setFixedWidth(62) lay = QHBoxLayout(self) lay.setContentsMargins(0, 0, 0, 0) lay.addWidget(self.slider, 1) lay.addWidget(self.spin) self.slider.sliderReleased.connect(self._slider_released) self.slider.valueChanged.connect(self._slider_moved) self.spin.editingFinished.connect(self._spin_finished) def _slider_moved(self, v: int): if not self._updating: self._updating = True self.spin.setValue(v) self._updating = False def _slider_released(self): self.value_committed.emit(self.slider.value()) def _spin_finished(self): if not self._updating: self._updating = True self.slider.setValue(self.spin.value()) self._updating = False self.value_committed.emit(self.spin.value()) def set_value_silent(self, v: int): self._updating = True self.slider.setValue(v) self.spin.setValue(v) self._updating = False def value(self) -> int: return self.slider.value() # ───────────────────────────────────────────────────────────────────────────── # Main window # ───────────────────────────────────────────────────────────────────────────── class MainWindow(QMainWindow): _log_signal = pyqtSignal(str) _state_signal = pyqtSignal(object) _err_signal = pyqtSignal(str) def __init__(self): super().__init__() self.setWindowTitle("ESP32 FlatPanel Controller") self.setMinimumSize(740, 620) self._backend: Optional[Backend] = None self._poller: Optional[PollWorker] = None self._pool = ThreadPoolExecutor(max_workers=1) self._settings = QSettings("FlatPanel", "Controller") self._last_brightness = 100 self._last_dew = 0 self._dew_on = False self._log_signal.connect(self._append_log) self._state_signal.connect(self._apply_state) self._err_signal.connect(self._on_poll_error) self._build_ui() self._load_settings() # ── UI construction ────────────────────────────────────────────────────── def _build_ui(self): root = QWidget() self.setCentralWidget(root) main_lay = QVBoxLayout(root) main_lay.setSpacing(8) main_lay.setContentsMargins(10, 10, 10, 10) main_lay.addWidget(self._build_connection_panel()) ctrl_row = QHBoxLayout() ctrl_row.addWidget(self._build_cover_panel(), 1) ctrl_row.addWidget(self._build_flat_panel(), 1) ctrl_row.addWidget(self._build_dew_panel(), 1) main_lay.addLayout(ctrl_row) main_lay.addWidget(self._build_log_panel(), 1) self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) self.status_bar.showMessage("Not connected") def _build_connection_panel(self) -> QGroupBox: gb = QGroupBox("Connection") lay = QHBoxLayout(gb) # Mode radio buttons mode_box = QVBoxLayout() self.rb_wifi = QRadioButton("WiFi (Alpaca)") self.rb_usb = QRadioButton("USB (Alnitak)") self.rb_wifi.setChecked(True) self._mode_group = QButtonGroup() self._mode_group.addButton(self.rb_wifi, 0) self._mode_group.addButton(self.rb_usb, 1) mode_box.addWidget(self.rb_wifi) mode_box.addWidget(self.rb_usb) lay.addLayout(mode_box) lay.addSpacing(10) # WiFi settings self._wifi_widget = QWidget() wifi_lay = QHBoxLayout(self._wifi_widget) wifi_lay.setContentsMargins(0, 0, 0, 0) wifi_lay.addWidget(QLabel("Host:")) self.le_host = QLineEdit("esp32-flatpanel.local") self.le_host.setMinimumWidth(200) wifi_lay.addWidget(self.le_host) wifi_lay.addWidget(QLabel("Port:")) self.sb_port = QSpinBox() self.sb_port.setRange(1, 65535) self.sb_port.setValue(11111) self.sb_port.setFixedWidth(70) wifi_lay.addWidget(self.sb_port) lay.addWidget(self._wifi_widget) # USB settings self._usb_widget = QWidget() usb_lay = QHBoxLayout(self._usb_widget) usb_lay.setContentsMargins(0, 0, 0, 0) usb_lay.addWidget(QLabel("Port:")) self.cb_serial = QComboBox() self.cb_serial.setMinimumWidth(180) self._refresh_ports() usb_lay.addWidget(self.cb_serial) btn_refresh = QToolButton() btn_refresh.setText("⟳") btn_refresh.setToolTip("Refresh port list") btn_refresh.clicked.connect(self._refresh_ports) usb_lay.addWidget(btn_refresh) lay.addWidget(self._usb_widget) self._usb_widget.hide() lay.addStretch() # Connect button + status self.btn_connect = QPushButton("Connect") self.btn_connect.setFixedWidth(100) self.btn_connect.clicked.connect(self._on_connect_toggle) lay.addWidget(self.btn_connect) self.lbl_conn_status = StatusDot() self.lbl_conn_status.set_state(1) lay.addWidget(self.lbl_conn_status) self.rb_wifi.toggled.connect(self._mode_changed) self.rb_usb.toggled.connect(self._mode_changed) return gb def _build_cover_panel(self) -> QGroupBox: gb = QGroupBox("Cover") lay = QVBoxLayout(gb) self.lbl_cover = StatusDot() self.lbl_cover.set_state(1, COVER_LABELS) lay.addWidget(self.lbl_cover, alignment=Qt.AlignmentFlag.AlignHCenter) lay.addSpacing(8) btn_row = QHBoxLayout() self.btn_open = self._cmd_btn("Open", "#1565c0", self._do_open_cover) self.btn_close = self._cmd_btn("Close", "#e65100", self._do_close_cover) btn_row.addWidget(self.btn_open) btn_row.addWidget(self.btn_close) lay.addLayout(btn_row) self.btn_halt = self._cmd_btn("Halt", "#b71c1c", self._do_halt_cover) lay.addWidget(self.btn_halt) lay.addStretch() self._set_controls_enabled(False) return gb def _build_flat_panel(self) -> QGroupBox: gb = QGroupBox("Flat Panel") lay = QVBoxLayout(gb) self.lbl_cal = StatusDot() self.lbl_cal.set_state(1, CAL_LABELS) lay.addWidget(self.lbl_cal, alignment=Qt.AlignmentFlag.AlignHCenter) lay.addSpacing(4) lay.addWidget(QLabel("Brightness (0 – 255):")) self.slider_bright = LinkedSliderSpin(0, 255, 1) self.slider_bright.set_value_silent(100) self.slider_bright.value_committed.connect(self._brightness_committed) lay.addWidget(self.slider_bright) lay.addSpacing(4) btn_row = QHBoxLayout() self.btn_cal_on = self._cmd_btn("Light On", "#2e7d32", self._do_cal_on) self.btn_cal_off = self._cmd_btn("Light Off", "#555555", self._do_cal_off) btn_row.addWidget(self.btn_cal_on) btn_row.addWidget(self.btn_cal_off) lay.addLayout(btn_row) lay.addStretch() return gb def _build_dew_panel(self) -> QGroupBox: gb = QGroupBox("Dew Heater (12 V)") lay = QVBoxLayout(gb) self.lbl_dew = QLabel( '' ' OFF 0 %' ) self.lbl_dew.setFont(QFont("Segoe UI", 11)) lay.addWidget(self.lbl_dew, alignment=Qt.AlignmentFlag.AlignHCenter) lay.addSpacing(4) lay.addWidget(QLabel("Power (0 – 100 %):")) self.slider_dew = LinkedSliderSpin(0, 100, 5) self.slider_dew.set_value_silent(0) self.slider_dew.value_committed.connect(self._dew_committed) lay.addWidget(self.slider_dew) lay.addSpacing(4) btn_row = QHBoxLayout() self.btn_dew_on = self._cmd_btn("Heater On", "#bf360c", self._do_dew_on) self.btn_dew_off = self._cmd_btn("Heater Off", "#555555", self._do_dew_off) btn_row.addWidget(self.btn_dew_on) btn_row.addWidget(self.btn_dew_off) lay.addLayout(btn_row) lay.addStretch() return gb def _build_log_panel(self) -> QGroupBox: gb = QGroupBox("Log") lay = QVBoxLayout(gb) self.log_view = QPlainTextEdit() self.log_view.setReadOnly(True) self.log_view.setMaximumBlockCount(500) self.log_view.setFont(QFont("Courier New", 9)) lay.addWidget(self.log_view) btn_clear = QPushButton("Clear") btn_clear.setFixedWidth(70) btn_clear.clicked.connect(self.log_view.clear) row = QHBoxLayout() row.addStretch() row.addWidget(btn_clear) lay.addLayout(row) return gb @staticmethod def _cmd_btn(label: str, bg: str, slot) -> QPushButton: btn = QPushButton(label) btn.setStyleSheet( f"QPushButton {{ background-color:{bg}; color:white; " f"border-radius:4px; padding:5px 10px; }}" f"QPushButton:hover {{ background-color:{bg}dd; }}" f"QPushButton:disabled {{ background-color:#333; color:#666; }}" ) btn.clicked.connect(slot) return btn # ── Settings ───────────────────────────────────────────────────────────── def _load_settings(self): mode = self._settings.value("mode", "wifi") self.rb_usb.setChecked(mode == "usb") self.rb_wifi.setChecked(mode == "wifi") self.le_host.setText(self._settings.value("host", "esp32-flatpanel.local")) self.sb_port.setValue(int(self._settings.value("port", 11111))) saved_serial = self._settings.value("serial_port", "") idx = self.cb_serial.findText(saved_serial) if idx >= 0: self.cb_serial.setCurrentIndex(idx) self._last_brightness = int(self._settings.value("last_brightness", 100)) self._last_dew = int(self._settings.value("last_dew", 0)) self.slider_bright.set_value_silent(self._last_brightness) self.slider_dew.set_value_silent(self._last_dew) def _save_settings(self): self._settings.setValue("mode", "usb" if self.rb_usb.isChecked() else "wifi") self._settings.setValue("host", self.le_host.text()) self._settings.setValue("port", self.sb_port.value()) self._settings.setValue("serial_port", self.cb_serial.currentText()) self._settings.setValue("last_brightness", self.slider_bright.value()) self._settings.setValue("last_dew", self.slider_dew.value()) # ── UI helpers ──────────────────────────────────────────────────────────── def _mode_changed(self): usb = self.rb_usb.isChecked() self._wifi_widget.setVisible(not usb) self._usb_widget.setVisible(usb) def _refresh_ports(self): prev = self.cb_serial.currentText() self.cb_serial.clear() ports = [p.device for p in serial.tools.list_ports.comports()] self.cb_serial.addItems(ports if ports else ["(no ports found)"]) idx = self.cb_serial.findText(prev) if idx >= 0: self.cb_serial.setCurrentIndex(idx) def _set_controls_enabled(self, enabled: bool): for w in (self.btn_open, self.btn_close, self.btn_halt, self.btn_cal_on, self.btn_cal_off, self.btn_dew_on, self.btn_dew_off, self.slider_bright, self.slider_dew): w.setEnabled(enabled) # ── Connection ──────────────────────────────────────────────────────────── def _on_connect_toggle(self): if self._backend is not None: self._disconnect() else: self._connect() def _connect(self): self.btn_connect.setEnabled(False) self.status_bar.showMessage("Connecting…") def worker(): try: if self.rb_usb.isChecked(): b = AlnitakBackend(self.cb_serial.currentText()) else: b = AlpacaBackend(self.le_host.text(), self.sb_port.value()) b.connect() return b except Exception as exc: return exc def done(fut): result = fut.result() if isinstance(result, Exception): self._log_signal.emit(f"Connection failed: {result}") self.status_bar.showMessage("Connection failed") self.btn_connect.setEnabled(True) return self._backend = result self._save_settings() self._poller = PollWorker(self._backend) self._poller.state_ready.connect(self._state_signal) self._poller.poll_error.connect(self._err_signal) self._poller.start() self.btn_connect.setText("Disconnect") self.btn_connect.setEnabled(True) self._set_controls_enabled(True) mode = "USB/Alnitak" if self.rb_usb.isChecked() else f"WiFi ({self.le_host.text()})" self._log_signal.emit(f"Connected via {mode}") self.status_bar.showMessage(f"Connected – {mode}") self._pool.submit(worker).add_done_callback( lambda f: QTimer.singleShot(0, lambda: done(f)) ) def _disconnect(self): if self._poller: self._poller.stop() self._poller = None if self._backend: try: self._backend.disconnect() except Exception: pass self._backend = None self.btn_connect.setText("Connect") self._set_controls_enabled(False) self.lbl_conn_status.set_state(1) self.lbl_cover.set_state(1, COVER_LABELS) self.lbl_cal.set_state(1, CAL_LABELS) self.status_bar.showMessage("Disconnected") self._log_signal.emit("Disconnected.") # ── Commands ────────────────────────────────────────────────────────────── def _run(self, fn, *args, msg=""): """Submit a backend command to the thread pool, log result.""" if self._backend is None: return def worker(): try: fn(*args) if msg: self._log_signal.emit(msg) except Exception as e: self._log_signal.emit(f"ERROR: {e}") self._pool.submit(worker) def _do_open_cover(self): self._run(self._backend.open_cover, msg="→ Open cover") def _do_close_cover(self): self._run(self._backend.close_cover, msg="→ Close cover") def _do_halt_cover(self): self._run(self._backend.halt_cover, msg="→ Halt cover") def _do_cal_on(self): b = self.slider_bright.value() self._last_brightness = b self._run(self._backend.calibrator_on, b, msg=f"→ Light ON brightness={b}") def _do_cal_off(self): self._run(self._backend.calibrator_off, msg="→ Light OFF") def _brightness_committed(self, v: int): self._last_brightness = v self._run(self._backend.calibrator_on, v, msg=f"→ Brightness set to {v}") def _do_dew_on(self): pct = max(self._last_dew, self.slider_dew.value()) or 50 self.slider_dew.set_value_silent(pct) self._dew_on = True self._run(self._backend.set_dew, pct, msg=f"→ Dew heater ON {pct}%") def _do_dew_off(self): self._last_dew = self.slider_dew.value() self._dew_on = False self.slider_dew.set_value_silent(0) self._run(self._backend.set_dew, 0, msg="→ Dew heater OFF") def _dew_committed(self, v: int): self._last_dew = v if v > 0 else self._last_dew self._run(self._backend.set_dew, v, msg=f"→ Dew heater {v}%") # ── State update (called on GUI thread via signal) ──────────────────────── def _apply_state(self, state: DeviceState): self.lbl_conn_status.set_state(3, {3: ("CONNECTED", "#4caf50")}) self.lbl_cover.set_state(state.cover, COVER_LABELS) self.lbl_cal.set_state(state.calibrator, CAL_LABELS) self.slider_bright.set_value_silent(state.brightness) dew = state.dew colour = "#ff7043" if dew > 0 else "#9e9e9e" self.lbl_dew.setText( f'' f' {"ON" if dew > 0 else "OFF"} {dew}%' ) self.slider_dew.set_value_silent(dew) def _on_poll_error(self, msg: str): self.status_bar.showMessage(f"Poll error: {msg}") # ── Log ────────────────────────────────────────────────────────────────── def _append_log(self, text: str): ts = datetime.now().strftime("%H:%M:%S") self.log_view.appendPlainText(f"[{ts}] {text}") # ── Cleanup ─────────────────────────────────────────────────────────────── def closeEvent(self, event): self._save_settings() self._disconnect() self._pool.shutdown(wait=False) super().closeEvent(event) # ───────────────────────────────────────────────────────────────────────────── # Dark theme # ───────────────────────────────────────────────────────────────────────────── def apply_dark_theme(app: QApplication): app.setStyle("Fusion") pal = QPalette() C = QColor pal.setColor(QPalette.ColorRole.Window, C(26, 26, 26)) pal.setColor(QPalette.ColorRole.WindowText, C(224, 224, 224)) pal.setColor(QPalette.ColorRole.Base, C(38, 38, 38)) pal.setColor(QPalette.ColorRole.AlternateBase, C(45, 45, 45)) pal.setColor(QPalette.ColorRole.ToolTipBase, C(50, 50, 50)) pal.setColor(QPalette.ColorRole.ToolTipText, C(220, 220, 220)) pal.setColor(QPalette.ColorRole.Text, C(224, 224, 224)) pal.setColor(QPalette.ColorRole.Button, C(50, 50, 50)) pal.setColor(QPalette.ColorRole.ButtonText, C(224, 224, 224)) pal.setColor(QPalette.ColorRole.BrightText, C(255, 255, 255)) pal.setColor(QPalette.ColorRole.Link, C(66, 165, 245)) pal.setColor(QPalette.ColorRole.Highlight, C(58, 122, 200)) pal.setColor(QPalette.ColorRole.HighlightedText, C(255, 255, 255)) pal.setColor(QPalette.ColorGroup.Disabled, QPalette.ColorRole.ButtonText, C(100, 100, 100)) pal.setColor(QPalette.ColorGroup.Disabled, QPalette.ColorRole.Text, C(100, 100, 100)) app.setPalette(pal) app.setStyleSheet(""" QGroupBox { border: 1px solid #404040; border-radius: 6px; margin-top: 10px; padding-top: 6px; font-weight: bold; } QGroupBox::title { subcontrol-origin: margin; left: 10px; padding: 0 4px; color: #90caf9; } QSlider::groove:horizontal { height: 6px; background: #404040; border-radius: 3px; } QSlider::handle:horizontal { background: #3a7ac8; width: 16px; height: 16px; border-radius: 8px; margin: -5px 0; } QSlider::sub-page:horizontal { background: #3a7ac8; border-radius: 3px; } QLineEdit, QSpinBox, QComboBox, QPlainTextEdit { background: #2a2a2a; border: 1px solid #505050; border-radius: 4px; padding: 3px; } QPushButton { border-radius: 4px; padding: 5px 12px; } QPushButton:default { background-color: #3a7ac8; color: white; } """) # ───────────────────────────────────────────────────────────────────────────── # Entry point # ───────────────────────────────────────────────────────────────────────────── def main(): app = QApplication(sys.argv) app.setApplicationName("ESP32 FlatPanel Controller") app.setOrganizationName("FlatPanel") apply_dark_theme(app) win = MainWindow() win.show() sys.exit(app.exec()) if __name__ == "__main__": main()