#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Alice plugin: CLI + web UI для подготовки и эксплуатации интеграции. Что делает модуль: - получает станции из облака Яндекса и из локального mDNS; - объединяет данные в единый `stations.json`; - генерирует шаблоны для Loxone и wb-rules; - отдаёт web API для первичной настройки. """ import argparse import asyncio import html import ipaddress import json import os import platform import shutil import socket import subprocess import sys import zipfile from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import aiohttp from aiohttp import web import yastation BASE_DIR = Path(__file__).resolve().parent DATA_DIR = BASE_DIR / "data" CONFIGS_DIR = BASE_DIR / "configs" LOXONE_DIR = CONFIGS_DIR / "loxone" WB_RULES_DIR = CONFIGS_DIR / "wb-rules" STATIONS_PATH = DATA_DIR / "stations.json" CONFIG_PATH = DATA_DIR / "config.json" TOKEN_PATH = BASE_DIR / "token.txt" YANDEX_INFO_URL = "https://api.iot.yandex.net/v1.0/user/info" @dataclass class StationRecord: """Нормализованная запись станции для хранения и генерации шаблонов.""" name: str station_id: str home: str room: str model: str con_url: str local_id: str def selector(self) -> str: """Выбирает лучший идентификатор станции для API-команд. Приоритет: 1) host из `con_url` (если есть) - самый надёжный для локального управления; 2) `local_id` из mDNS; 3) `station_id` из облака. """ host = "" if self.con_url: value = self.con_url.strip() if not value.startswith("http://") and not value.startswith("https://"): value = f"http://{value}" try: from urllib.parse import urlparse host = (urlparse(value).hostname or "").strip() except Exception: host = "" return host or self.local_id or self.station_id def ensure_dirs() -> None: """Гарантирует базовую структуру директорий проекта.""" DATA_DIR.mkdir(parents=True, exist_ok=True) CONFIGS_DIR.mkdir(parents=True, exist_ok=True) LOXONE_DIR.mkdir(parents=True, exist_ok=True) WB_RULES_DIR.mkdir(parents=True, exist_ok=True) def safe_name(value: str, fallback: str) -> str: out = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in (value or "").strip()) out = out.strip("._-") return out or fallback def load_json(path: Path, default: Any) -> Any: if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def save_json(path: Path, payload: Any) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") def read_token(cli_token: str) -> str: """Читает токен из источников по приоритету. Зачем порядок важен: - CLI/ENV позволяют временно переопределить токен; - `token.txt` и `config.json` нужны для постоянного хранения. """ token = (cli_token or "").strip() if token: return token env_token = os.environ.get("YANDEX_TOKEN", "").strip() if env_token: return env_token if TOKEN_PATH.exists(): return TOKEN_PATH.read_text(encoding="utf-8").strip() cfg = load_json(CONFIG_PATH, {}) return str(cfg.get("token") or "").strip() def save_token(token: str) -> None: token = token.strip() TOKEN_PATH.write_text(token + "\n", encoding="utf-8") os.chmod(TOKEN_PATH, 0o600) cfg = load_json(CONFIG_PATH, {}) cfg["token"] = token save_json(CONFIG_PATH, cfg) def save_controller_host(host: str) -> None: cfg = load_json(CONFIG_PATH, {}) cfg["controller_host"] = host.strip() save_json(CONFIG_PATH, cfg) def _is_valid_local_ip(value: str) -> bool: try: ip = ipaddress.ip_address(value) except Exception: return False return bool(ip.version == 4 and not ip.is_loopback and not ip.is_link_local and not ip.is_unspecified) def _is_loopback_host(value: str) -> bool: host = (value or "").strip().lower() if not host: return False if host == "localhost": return True return host.startswith("127.") def _detect_local_ip() -> str: # Best-effort detection of the primary LAN address. try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("1.1.1.1", 80)) ip = str(s.getsockname()[0] or "").strip() if _is_valid_local_ip(ip): return ip except Exception: pass try: host = socket.gethostname() for family, _, _, _, sockaddr in socket.getaddrinfo(host, None, socket.AF_INET): if family != socket.AF_INET or not sockaddr: continue ip = str(sockaddr[0] or "").strip() if _is_valid_local_ip(ip): return ip except Exception: pass try: _, _, ips = socket.gethostbyname_ex(socket.gethostname()) for ip in ips: if _is_valid_local_ip(str(ip).strip()): return str(ip).strip() except Exception: pass return "127.0.0.1" def detect_controller_host() -> str: """Определяет адрес контроллера для ссылок в шаблонах. Сначала используем явные настройки (config/env), затем авто-детект LAN IP. Loopback отбрасывается, чтобы избежать нерабочих ссылок при использовании шаблонов на внешних клиентах. """ cfg = load_json(CONFIG_PATH, {}) cfg_host = str(cfg.get("controller_host") or "").strip() if cfg_host and not _is_loopback_host(cfg_host): return cfg_host env_host = os.environ.get("ALICE_CONTROLLER_HOST", "").strip() if env_host and not _is_loopback_host(env_host): return env_host return _detect_local_ip() def normalize_model(model: str) -> str: value = (model or "").strip() prefix = "devices.types.smart_speaker." if value.lower().startswith(prefix): return value[len(prefix) :] return value async def fetch_cloud_stations(token: str) -> List[Dict[str, Any]]: """Получает станции из облачного API и нормализует структуру. На выходе каждая станция содержит `_match_ids` - набор идентификаторов, по которым затем безопасно матчим локальное mDNS-обнаружение. """ headers = {"Authorization": f"OAuth {token}"} timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(YANDEX_INFO_URL, headers=headers) as resp: text = await resp.text() if resp.status >= 400: raise RuntimeError(f"Yandex API error {resp.status}: {text[:220]}") try: data = json.loads(text) except Exception as exc: raise RuntimeError(f"Bad Yandex API JSON: {exc}") from exc homes: Dict[str, str] = {} for home in data.get("homes") or []: hid = str(home.get("id") or "").strip() if hid: homes[hid] = str(home.get("name") or "").strip() for home in data.get("households") or []: hid = str(home.get("id") or "").strip() if hid and hid not in homes: homes[hid] = str(home.get("name") or "").strip() rooms: Dict[str, str] = {} room_home: Dict[str, str] = {} for room in data.get("rooms") or []: rid = str(room.get("id") or "").strip() if not rid: continue rooms[rid] = str(room.get("name") or "").strip() home_id = str(room.get("home_id") or room.get("household_id") or room.get("household") or "").strip() if home_id and home_id in homes: room_home[rid] = homes[home_id] def nrm_id(value: str) -> str: return value.strip().lower() def extract_match_ids(device: Dict[str, Any]) -> List[str]: candidates = [ str(device.get("id") or ""), str(device.get("device_id") or ""), str(device.get("external_id") or ""), str((device.get("quasar_info") or {}).get("device_id") or ""), str((device.get("quasar_info") or {}).get("deviceId") or ""), str((device.get("quasar_info") or {}).get("external_id") or ""), str((device.get("quasar_info") or {}).get("serial") or ""), str(device.get("serial") or ""), ] out: Dict[str, bool] = {} for c in candidates: n = nrm_id(c) if n: out[n] = True return list(out.keys()) result: List[Dict[str, Any]] = [] for dev in data.get("devices") or []: dtype = str(dev.get("type") or "").lower() if "speaker" not in dtype and "smart_speaker" not in dtype: continue room_id = str(dev.get("room") or "").strip() home_id = str(dev.get("home_id") or dev.get("household_id") or dev.get("household") or "").strip() home_name = homes.get(home_id, room_home.get(room_id, "")) result.append( { "name": str(dev.get("name") or "").strip(), "id": str(dev.get("id") or "").strip(), "home": home_name, "room": rooms.get(room_id, ""), "model": normalize_model(str(dev.get("model") or dev.get("type") or "")), "_match_ids": extract_match_ids(dev), } ) return result async def discover_local(timeout: float = 3.0) -> List[Dict[str, str]]: items = await yastation.discover(timeout) out: List[Dict[str, str]] = [] for st in items: out.append({"id": st.device_id, "name": st.name, "ip": st.host, "platform": st.platform}) return out def merge_cloud_with_local(cloud: List[Dict[str, Any]], local: List[Dict[str, str]]) -> List[StationRecord]: """Объединяет облачные и локальные данные станций. Логика сопоставления: 1) строго по идентификаторам (`_match_ids`); 2) по имени, только если совпадение имени уникально; 3) если совпадения нет - IP оставляем пустым (без опасных догадок). """ def nrm(value: str) -> str: return " ".join((value or "").split()).strip().lower() local_by_id: Dict[str, Dict[str, str]] = {} local_by_name: Dict[str, Dict[str, str]] = {} local_name_count: Dict[str, int] = {} for row in local: lid = (row.get("id") or "").strip() if lid: local_by_id[lid.lower()] = row name_key = nrm(row.get("name") or "") if name_key: local_name_count[name_key] = local_name_count.get(name_key, 0) + 1 local_by_name[name_key] = row merged: List[StationRecord] = [] for st in cloud: ip = "" local_id = "" for candidate in st.get("_match_ids") or []: item = local_by_id.get(str(candidate).strip().lower()) if item: ip = str(item.get("ip") or "").strip() local_id = str(item.get("id") or "").strip() break if not ip: name_key = nrm(str(st.get("name") or "")) if name_key and local_name_count.get(name_key) == 1: item = local_by_name.get(name_key) or {} ip = str(item.get("ip") or "").strip() local_id = str(item.get("id") or "").strip() con_url = ip if ip else "" merged.append( StationRecord( name=str(st.get("name") or "").strip(), station_id=str(st.get("id") or "").strip(), home=str(st.get("home") or "").strip(), room=str(st.get("room") or "").strip(), model=normalize_model(str(st.get("model") or "")), con_url=con_url, local_id=local_id, ) ) return merged def serialize_stations(stations: List[StationRecord]) -> List[Dict[str, str]]: return [ { "name": s.name, "id": s.station_id, "home": s.home, "room": s.room, "model": s.model, "con_url": s.con_url, "ip": s.con_url, "local_id": s.local_id, } for s in stations ] def load_stations_records() -> List[StationRecord]: rows = load_json(STATIONS_PATH, []) out: List[StationRecord] = [] for row in rows: out.append( StationRecord( name=str(row.get("name") or "").strip(), station_id=str(row.get("id") or row.get("station_id") or "").strip(), home=str(row.get("home") or "").strip(), room=str(row.get("room") or "").strip(), model=str(row.get("model") or "").strip(), con_url=str(row.get("con_url") or row.get("ip") or "").strip(), local_id=str(row.get("local_id") or "").strip(), ) ) return [s for s in out if s.station_id] def build_vo_xml(controller_host: str, station_name: str, selector: str) -> str: base = f"http://{controller_host}:9124" from urllib.parse import quote qs = quote(selector, safe="") def mk(title: str, path: str, analog: bool = False) -> str: analog_val = "true" if analog else "false" return ( f'' ) rows = [ mk("TTS", f"/api/exec?action=tts&station={qs}&text=&volume=35", True), mk("Command", f"/api/exec?action=command&station={qs}&text=", True), mk("Play", f"/api/exec?action=player&station={qs}&cmd=play"), mk("Stop", f"/api/exec?action=player&station={qs}&cmd=stop"), mk("Next", f"/api/exec?action=player&station={qs}&cmd=next"), mk("Prev", f"/api/exec?action=player&station={qs}&cmd=prev"), mk("Volume", f"/api/exec?action=volume&station={qs}&level=", True), mk("Raw", f"/api/exec?action=raw&station={qs}&payload=", True), ] xml = [''] xml.append( f'' ) xml.append('\t') xml.extend(["\t" + row for row in rows]) xml.append("") return "\n".join(xml) + "\n" def build_vi_xml(controller_host: str, station_name: str, selector: str) -> str: from urllib.parse import quote addr = f"http://{controller_host}:9124/api/status?station={quote(selector, safe='')}" title = html.escape(f"Alice status - {station_name}") lines = [ '', f'', '\t', '\t', '\t', '\t', '\t', "", ] return "\n".join(lines) + "\n" def build_wb_rules_js(controller_host: str, station_name: str, selector: str) -> str: device_id = safe_name(f"alice_station_{station_name}".lower().replace("-", "_"), "alice_station") station_json = json.dumps(selector, ensure_ascii=False) title_json = json.dumps(f"Alice - {station_name}", ensure_ascii=False) base_json = json.dumps(f"http://{controller_host}:9124", ensure_ascii=False) device_json = json.dumps(device_id, ensure_ascii=False) return f"""// Auto-generated template for wb-rules // Place this file to /etc/wb-rules/ and restart wb-rules. var STATION_SELECTOR = {station_json}; var API_BASE = {base_json}; var DEVICE_ID = {device_json}; var DEVICE_TITLE = {title_json}; function _apiGet(path, cb) {{ runShellCommand("curl -s '" + API_BASE + path + "'", {{ captureOutput: true, exitCallback: function (exitCode, capturedOutput) {{ if (exitCode !== 0) return cb({{ ok: false, error: "curl_failed" }}); try {{ return cb(JSON.parse(capturedOutput || "{{}}")); }} catch (e) {{ return cb({{ ok: false, error: "bad_json", raw: capturedOutput }}); }} }} }}); }} function _enc(v) {{ return encodeURIComponent(String(v === undefined || v === null ? "" : v)); }} defineVirtualDevice(DEVICE_ID, {{ title: DEVICE_TITLE, cells: {{ daemon_ok: {{ title: {{ en: "Daemon OK", ru: "Демон OK" }}, type: "switch", value: false, readonly: true, order: 10 }}, ws_running: {{ title: {{ en: "WS Running", ru: "WS к колонке" }}, type: "switch", value: false, readonly: true, order: 20 }}, playing: {{ title: {{ en: "Playing", ru: "Воспроизведение" }}, type: "switch", value: false, readonly: true, order: 30 }}, current_volume: {{ title: {{ en: "Current Volume", ru: "Текущая громкость" }}, type: "value", value: 0, readonly: true, order: 40 }}, tts_text: {{ title: {{ en: "TTS Text", ru: "Текст TTS" }}, type: "text", value: "", readonly: false, order: 100 }}, tts_send: {{ title: {{ en: "Send TTS", ru: "Отправить TTS" }}, type: "pushbutton", value: false, order: 110 }}, command_text: {{ title: {{ en: "Command Text", ru: "Текст команды" }}, type: "text", value: "", readonly: false, order: 120 }}, command_send: {{ title: {{ en: "Send Command", ru: "Отправить команду" }}, type: "pushbutton", value: false, order: 130 }}, volume: {{ title: {{ en: "Volume", ru: "Громкость" }}, type: "range", value: 35, min: 0, max: 100, order: 200 }}, raw_json: {{ title: {{ en: "Raw JSON", ru: "RAW JSON" }}, type: "text", value: "{{\\"command\\":\\"stop\\"}}", readonly: false, order: 220 }}, raw_send: {{ title: {{ en: "Send RAW", ru: "Отправить RAW" }}, type: "pushbutton", value: false, order: 230 }}, play: {{ title: {{ en: "Play", ru: "Play" }}, type: "pushbutton", value: false, order: 300 }}, stop: {{ title: {{ en: "Stop", ru: "Stop" }}, type: "pushbutton", value: false, order: 310 }}, next: {{ title: {{ en: "Next", ru: "Next" }}, type: "pushbutton", value: false, order: 320 }}, prev: {{ title: {{ en: "Prev", ru: "Prev" }}, type: "pushbutton", value: false, order: 330 }}, status_json: {{ title: {{ en: "Status JSON", ru: "Статус JSON" }}, type: "text", value: "{{}}", readonly: true, order: 400 }} }} }}); defineRule(DEVICE_ID + "_tts_send", {{ whenChanged: DEVICE_ID + "/tts_send", then: function (newValue) {{ if (!newValue) return; var txt = String(dev[DEVICE_ID + "/tts_text"] || "").trim(); if (!txt) return; _apiGet("/api/exec?action=tts&station=" + _enc(STATION_SELECTOR) + "&text=" + _enc(txt) + "&volume=35", function(){{}}); }} }}); defineRule(DEVICE_ID + "_command_send", {{ whenChanged: DEVICE_ID + "/command_send", then: function (newValue) {{ if (!newValue) return; var txt = String(dev[DEVICE_ID + "/command_text"] || "").trim(); if (!txt) return; _apiGet("/api/exec?action=command&station=" + _enc(STATION_SELECTOR) + "&text=" + _enc(txt), function(){{}}); }} }}); defineRule(DEVICE_ID + "_volume_control", {{ whenChanged: DEVICE_ID + "/volume", then: function () {{ var level = parseInt(dev[DEVICE_ID + "/volume"], 10); if (isNaN(level)) level = 35; if (level < 0) level = 0; if (level > 100) level = 100; _apiGet("/api/exec?action=volume&station=" + _enc(STATION_SELECTOR) + "&level=" + _enc(level), function(){{}}); }} }}); function _player(cmd) {{ return function (newValue) {{ if (!newValue) return; _apiGet("/api/exec?action=player&station=" + _enc(STATION_SELECTOR) + "&cmd=" + _enc(cmd), function(){{}}); }}; }} defineRule(DEVICE_ID + "_play", {{ whenChanged: DEVICE_ID + "/play", then: _player("play") }}); defineRule(DEVICE_ID + "_stop", {{ whenChanged: DEVICE_ID + "/stop", then: _player("stop") }}); defineRule(DEVICE_ID + "_next", {{ whenChanged: DEVICE_ID + "/next", then: _player("next") }}); defineRule(DEVICE_ID + "_prev", {{ whenChanged: DEVICE_ID + "/prev", then: _player("prev") }}); defineRule(DEVICE_ID + "_raw_send", {{ whenChanged: DEVICE_ID + "/raw_send", then: function (newValue) {{ if (!newValue) return; var raw = String(dev[DEVICE_ID + "/raw_json"] || "").trim(); if (!raw) return; _apiGet("/api/exec?action=raw&station=" + _enc(STATION_SELECTOR) + "&payload=" + _enc(raw), function(){{}}); }} }}); defineRule(DEVICE_ID + "_status_poll", {{ when: cron("@every 10s"), then: function () {{ _apiGet("/api/status?station=" + _enc(STATION_SELECTOR), function (j) {{ var ok = Number(j && j.ok ? 1 : 0); var running = Number(j && j.running ? 1 : 0); dev[DEVICE_ID + "/daemon_ok"] = !!ok; dev[DEVICE_ID + "/ws_running"] = !!running; if (j && j.playing !== undefined && j.playing !== null) dev[DEVICE_ID + "/playing"] = !!Number(j.playing ? 1 : 0); if (j && j.volume !== undefined && j.volume !== null) dev[DEVICE_ID + "/current_volume"] = Number(j.volume) || 0; dev[DEVICE_ID + "/status_json"] = JSON.stringify(j || {{}}); }}); }} }}); """ def write_loxone_templates(station: StationRecord, controller_host: str) -> Dict[str, str]: station_folder = LOXONE_DIR / safe_name(station.name or station.station_id, station.station_id or "station") station_folder.mkdir(parents=True, exist_ok=True) selector = station.selector() vo_path = station_folder / "VO.xml" vi_path = station_folder / "VI.xml" readme_path = station_folder / "README.md" vo_path.write_text(build_vo_xml(controller_host, station.name or station.station_id, selector), encoding="utf-8") vi_path.write_text(build_vi_xml(controller_host, station.name or station.station_id, selector), encoding="utf-8") readme_path.write_text( "# Alice templates\n\n" f"- Station: {station.name}\n" f"- Station ID: {station.station_id}\n" f"- Selector: {selector}\n" f"- API: http://{controller_host}:9124\n", encoding="utf-8", ) return { "station": station.station_id, "name": station.name, "folder": str(station_folder.relative_to(BASE_DIR)), "vo": str(vo_path.relative_to(BASE_DIR)), "vi": str(vi_path.relative_to(BASE_DIR)), } def write_wb_rules_templates(station: StationRecord, controller_host: str) -> Dict[str, str]: station_folder = WB_RULES_DIR / safe_name(station.name or station.station_id, station.station_id or "station") station_folder.mkdir(parents=True, exist_ok=True) selector = station.selector() wb_path = station_folder / "wb-rules-station.js" readme_path = station_folder / "README.md" wb_path.write_text(build_wb_rules_js(controller_host, station.name or station.station_id, selector), encoding="utf-8") readme_path.write_text( "# Alice wb-rules templates\n\n" f"- Station: {station.name}\n" f"- Station ID: {station.station_id}\n" f"- Selector: {selector}\n" f"- API: http://{controller_host}:9124\n", encoding="utf-8", ) return { "station": station.station_id, "name": station.name, "folder": str(station_folder.relative_to(BASE_DIR)), "wb": str(wb_path.relative_to(BASE_DIR)), } def generate_templates(template_kind: str, controller_host: str) -> Dict[str, Any]: """Генерирует шаблоны и zip-архив для выбранной платформы.""" stations = load_stations_records() if not stations: raise RuntimeError("stations.json is empty, run station refresh first") ensure_dirs() result: List[Dict[str, str]] = [] if template_kind == "loxone": target_dir = LOXONE_DIR zip_name = "loxone_templates.zip" for station in stations: result.append(write_loxone_templates(station, controller_host)) elif template_kind == "wb-rules": target_dir = WB_RULES_DIR zip_name = "wb_rules_templates.zip" for station in stations: result.append(write_wb_rules_templates(station, controller_host)) else: raise RuntimeError(f"unsupported template kind: {template_kind}") zip_path = target_dir / zip_name with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for station in stations: folder = target_dir / safe_name(station.name or station.station_id, station.station_id or "station") if template_kind == "loxone": for fn in ["VO.xml", "VI.xml", "README.md"]: p = folder / fn if p.exists(): zf.write(p, arcname=f"{folder.name}/{fn}") elif template_kind == "wb-rules": for fn in ["wb-rules-station.js", "README.md"]: p = folder / fn if p.exists(): zf.write(p, arcname=f"{folder.name}/{fn}") return {"ok": True, "templates": result, "zip": str(zip_path.relative_to(BASE_DIR))} def is_wiren_board() -> bool: return Path("/etc/wb-release").exists() or Path("/etc/wb-rules").exists() def install_wb_rules(station_id: Optional[str] = None) -> Dict[str, Any]: """Копирует подготовленные wb-rules в системный каталог и перезапускает wb-rules. Если прямой записи нет, используется `sudo -n` (без интерактива), чтобы функция была пригодна для автоматизации из web/CLI. """ target_dir = Path("/etc/wb-rules") if not target_dir.exists(): raise RuntimeError("/etc/wb-rules not found") stations = load_stations_records() if station_id: stations = [s for s in stations if s.station_id == station_id] if not stations: raise RuntimeError(f"station not found: {station_id}") installed: List[str] = [] for station in stations: folder = WB_RULES_DIR / safe_name(station.name or station.station_id, station.station_id or "station") src = folder / "wb-rules-station.js" if not src.exists(): write_wb_rules_templates(station, detect_controller_host()) src = folder / "wb-rules-station.js" dst = target_dir / f"alice_station_{safe_name(station.station_id, 'station')}.js" try: shutil.copy2(src, dst) except PermissionError: subprocess.run(["sudo", "-n", "install", "-m", "0644", str(src), str(dst)], check=True) installed.append(str(dst)) try: subprocess.run(["systemctl", "restart", "wb-rules"], check=True) except Exception: subprocess.run(["sudo", "-n", "systemctl", "restart", "wb-rules"], check=True) return {"ok": True, "installed": installed} async def refresh_stations(token: str, timeout: float = 3.0) -> Dict[str, Any]: """Обновляет `stations.json` на основе облака + локальной сети.""" token = token.strip() if not token: raise RuntimeError("token is required") try: cloud = await fetch_cloud_stations(token) except Exception as exc: msg = str(exc) if "AUTH_TOKEN_INVALID" in msg: raise RuntimeError("YANDEX_OAUTH_TOKEN invalid: update token and retry") from exc raise local = await discover_local(timeout=timeout) merged = merge_cloud_with_local(cloud, local) payload = serialize_stations(merged) save_json(STATIONS_PATH, payload) return {"ok": True, "stations": payload, "cloud_total": len(cloud), "local_total": len(local)} async def handle_index(_: web.Request) -> web.Response: return web.FileResponse(BASE_DIR / "web" / "index.html") async def handle_static(request: web.Request) -> web.Response: name = request.match_info.get("name", "") p = BASE_DIR / "web" / name if not p.exists() or not p.is_file(): raise web.HTTPNotFound() return web.FileResponse(p) async def api_status(_: web.Request) -> web.Response: """Сводный статус для web UI: токен, станции, окружение, конфиг.""" stations = load_json(STATIONS_PATH, []) cfg = load_json(CONFIG_PATH, {}) return web.json_response( { "ok": True, "token_set": bool(read_token("")), "controller_host": detect_controller_host(), "stations": stations, "is_wiren_board": is_wiren_board(), "platform": platform.platform(), "config": {"controller_host": str(cfg.get("controller_host") or "")}, } ) async def api_set_token(request: web.Request) -> web.Response: try: body = await request.json() except Exception: return web.json_response({"ok": False, "error": "invalid json body"}, status=400) token = str(body.get("token") or "").strip() if not token: return web.json_response({"ok": False, "error": "token is required"}, status=422) save_token(token) return web.json_response({"ok": True}) async def api_set_host(request: web.Request) -> web.Response: try: body = await request.json() except Exception: return web.json_response({"ok": False, "error": "invalid json body"}, status=400) host = str(body.get("controller_host") or "").strip() if not host: return web.json_response({"ok": False, "error": "controller_host is required"}, status=422) save_controller_host(host) return web.json_response({"ok": True, "controller_host": detect_controller_host()}) async def api_refresh(_: web.Request) -> web.Response: """Перестраивает список станций и возвращает диагностику объединения.""" try: out = await refresh_stations(read_token("")) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def api_templates_loxone(_: web.Request) -> web.Response: try: out = generate_templates("loxone", detect_controller_host()) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def api_templates_wb(_: web.Request) -> web.Response: try: out = generate_templates("wb-rules", detect_controller_host()) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def api_download(request: web.Request) -> web.StreamResponse: kind = request.match_info.get("kind", "") if kind not in {"loxone", "wb-rules"}: raise web.HTTPNotFound() zip_name = "loxone_templates.zip" if kind == "loxone" else "wb_rules_templates.zip" zip_path = (LOXONE_DIR if kind == "loxone" else WB_RULES_DIR) / zip_name if not zip_path.exists(): generate_templates(kind, detect_controller_host()) return web.FileResponse(zip_path) async def api_install_wb(request: web.Request) -> web.Response: if not is_wiren_board(): return web.json_response({"ok": False, "error": "not a Wiren Board environment"}, status=422) try: body = await request.json() except Exception: body = {} station_id = str(body.get("station_id") or "").strip() or None try: out = install_wb_rules(station_id) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def run_web(host: str, port: int) -> int: """Запускает web UI и REST API плагина.""" app = web.Application() app.router.add_get("/", handle_index) app.router.add_get("/web/{name}", handle_static) app.router.add_get("/api/status", api_status) app.router.add_post("/api/token", api_set_token) app.router.add_post("/api/controller-host", api_set_host) app.router.add_post("/api/stations/refresh", api_refresh) app.router.add_post("/api/templates/loxone", api_templates_loxone) app.router.add_post("/api/templates/wb-rules", api_templates_wb) app.router.add_get("/api/download/{kind}", api_download) app.router.add_post("/api/wb-rules/install", api_install_wb) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, host=host, port=port) await site.start() print(f"alice plugin web listening on http://{host}:{port}") while True: await asyncio.sleep(3600) def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="Standalone Alice stations plugin") p.add_argument("--token", default="", help="Yandex OAuth token") sub = p.add_subparsers(dest="cmd", required=True) sp = sub.add_parser("stations-refresh", help="Обновить список станций") sp.add_argument("--timeout", type=float, default=3.0, help="Секунды ожидания локального mDNS-поиска") sp = sub.add_parser("templates-loxone", help="Сгенерировать шаблоны Loxone в ./configs/loxone") sp.add_argument("--controller-host", default="", help="IP или host контроллера для ссылок на API") sp = sub.add_parser("templates-wb-rules", help="Сгенерировать шаблоны wb-rules в ./configs/wb-rules") sp.add_argument("--controller-host", default="", help="IP или host контроллера для ссылок на API") sp = sub.add_parser("wb-install", help="Установить wb-rules шаблоны в /etc/wb-rules") sp.add_argument("--station-id", default="", help="Установить только одну станцию по station id") sp = sub.add_parser("web", help="Запустить web UI") sp.add_argument("--host", default="0.0.0.0", help="Хост для web UI") sp.add_argument("--port", type=int, default=9140, help="Порт для web UI") return p async def main_async() -> int: """Точка маршрутизации CLI-команд. Принцип: - каждая команда выполняет ровно один сценарий; - вывод в JSON удобен для автоматизации и диагностики. """ ensure_dirs() parser = build_parser() args = parser.parse_args() if args.cmd == "stations-refresh": token = read_token(args.token) out = await refresh_stations(token, timeout=args.timeout) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "templates-loxone": host = (args.controller_host or "").strip() or detect_controller_host() out = generate_templates("loxone", host) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "templates-wb-rules": host = (args.controller_host or "").strip() or detect_controller_host() out = generate_templates("wb-rules", host) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "wb-install": out = install_wb_rules(args.station_id.strip() or None) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "web": token = (args.token or "").strip() if token: save_token(token) return await run_web(args.host, args.port) parser.print_help() return 1 def main() -> int: try: return asyncio.run(main_async()) except KeyboardInterrupt: return 130 if __name__ == "__main__": raise SystemExit(main())