#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import asyncio import html import ipaddress import json import os import platform import shutil import socket import subprocess import sys import zipfile from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import aiohttp from aiohttp import web import yastation BASE_DIR = Path(__file__).resolve().parent DATA_DIR = BASE_DIR / "data" LOXONE_DIR = BASE_DIR / "loxone" STATIONS_PATH = DATA_DIR / "stations.json" CONFIG_PATH = DATA_DIR / "config.json" TOKEN_PATH = BASE_DIR / "token.txt" YANDEX_INFO_URL = "https://api.iot.yandex.net/v1.0/user/info" @dataclass class StationRecord: name: str station_id: str home: str room: str model: str con_url: str local_id: str def selector(self) -> str: host = "" if self.con_url: value = self.con_url.strip() if not value.startswith("http://") and not value.startswith("https://"): value = f"http://{value}" try: from urllib.parse import urlparse host = (urlparse(value).hostname or "").strip() except Exception: host = "" return host or self.local_id or self.station_id def ensure_dirs() -> None: DATA_DIR.mkdir(parents=True, exist_ok=True) LOXONE_DIR.mkdir(parents=True, exist_ok=True) def safe_name(value: str, fallback: str) -> str: out = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in (value or "").strip()) out = out.strip("._-") return out or fallback def load_json(path: Path, default: Any) -> Any: if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def save_json(path: Path, payload: Any) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") def read_token(cli_token: str) -> str: token = (cli_token or "").strip() if token: return token env_token = os.environ.get("YANDEX_TOKEN", "").strip() if env_token: return env_token if TOKEN_PATH.exists(): return TOKEN_PATH.read_text(encoding="utf-8").strip() cfg = load_json(CONFIG_PATH, {}) return str(cfg.get("token") or "").strip() def save_token(token: str) -> None: token = token.strip() TOKEN_PATH.write_text(token + "\n", encoding="utf-8") os.chmod(TOKEN_PATH, 0o600) cfg = load_json(CONFIG_PATH, {}) cfg["token"] = token save_json(CONFIG_PATH, cfg) def save_controller_host(host: str) -> None: cfg = load_json(CONFIG_PATH, {}) cfg["controller_host"] = host.strip() save_json(CONFIG_PATH, cfg) def _is_valid_local_ip(value: str) -> bool: try: ip = ipaddress.ip_address(value) except Exception: return False return bool(ip.version == 4 and not ip.is_loopback and not ip.is_link_local and not ip.is_unspecified) def _is_loopback_host(value: str) -> bool: host = (value or "").strip().lower() if not host: return False if host == "localhost": return True return host.startswith("127.") def _detect_local_ip() -> str: # Best-effort detection of the primary LAN address. try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("1.1.1.1", 80)) ip = str(s.getsockname()[0] or "").strip() if _is_valid_local_ip(ip): return ip except Exception: pass try: host = socket.gethostname() for family, _, _, _, sockaddr in socket.getaddrinfo(host, None, socket.AF_INET): if family != socket.AF_INET or not sockaddr: continue ip = str(sockaddr[0] or "").strip() if _is_valid_local_ip(ip): return ip except Exception: pass try: _, _, ips = socket.gethostbyname_ex(socket.gethostname()) for ip in ips: if _is_valid_local_ip(str(ip).strip()): return str(ip).strip() except Exception: pass return "127.0.0.1" def detect_controller_host() -> str: cfg = load_json(CONFIG_PATH, {}) cfg_host = str(cfg.get("controller_host") or "").strip() if cfg_host and not _is_loopback_host(cfg_host): return cfg_host env_host = os.environ.get("ALICE_CONTROLLER_HOST", "").strip() if env_host and not _is_loopback_host(env_host): return env_host return _detect_local_ip() def normalize_model(model: str) -> str: value = (model or "").strip() prefix = "devices.types.smart_speaker." if value.lower().startswith(prefix): return value[len(prefix) :] return value async def fetch_cloud_stations(token: str) -> List[Dict[str, Any]]: headers = {"Authorization": f"OAuth {token}"} timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(YANDEX_INFO_URL, headers=headers) as resp: text = await resp.text() if resp.status >= 400: raise RuntimeError(f"Yandex API error {resp.status}: {text[:220]}") try: data = json.loads(text) except Exception as exc: raise RuntimeError(f"Bad Yandex API JSON: {exc}") from exc homes: Dict[str, str] = {} for home in data.get("homes") or []: hid = str(home.get("id") or "").strip() if hid: homes[hid] = str(home.get("name") or "").strip() for home in data.get("households") or []: hid = str(home.get("id") or "").strip() if hid and hid not in homes: homes[hid] = str(home.get("name") or "").strip() rooms: Dict[str, str] = {} room_home: Dict[str, str] = {} for room in data.get("rooms") or []: rid = str(room.get("id") or "").strip() if not rid: continue rooms[rid] = str(room.get("name") or "").strip() home_id = str(room.get("home_id") or room.get("household_id") or room.get("household") or "").strip() if home_id and home_id in homes: room_home[rid] = homes[home_id] def nrm_id(value: str) -> str: return value.strip().lower() def extract_match_ids(device: Dict[str, Any]) -> List[str]: candidates = [ str(device.get("id") or ""), str(device.get("device_id") or ""), str(device.get("external_id") or ""), str((device.get("quasar_info") or {}).get("device_id") or ""), str((device.get("quasar_info") or {}).get("deviceId") or ""), str((device.get("quasar_info") or {}).get("external_id") or ""), str((device.get("quasar_info") or {}).get("serial") or ""), str(device.get("serial") or ""), ] out: Dict[str, bool] = {} for c in candidates: n = nrm_id(c) if n: out[n] = True return list(out.keys()) result: List[Dict[str, Any]] = [] for dev in data.get("devices") or []: dtype = str(dev.get("type") or "").lower() if "speaker" not in dtype and "smart_speaker" not in dtype: continue room_id = str(dev.get("room") or "").strip() home_id = str(dev.get("home_id") or dev.get("household_id") or dev.get("household") or "").strip() home_name = homes.get(home_id, room_home.get(room_id, "")) result.append( { "name": str(dev.get("name") or "").strip(), "id": str(dev.get("id") or "").strip(), "home": home_name, "room": rooms.get(room_id, ""), "model": normalize_model(str(dev.get("model") or dev.get("type") or "")), "_match_ids": extract_match_ids(dev), } ) return result async def discover_local(timeout: float = 3.0) -> List[Dict[str, str]]: items = await yastation.discover(timeout) out: List[Dict[str, str]] = [] for st in items: out.append({"id": st.device_id, "name": st.name, "ip": st.host, "platform": st.platform}) return out def merge_cloud_with_local(cloud: List[Dict[str, Any]], local: List[Dict[str, str]]) -> List[StationRecord]: def nrm(value: str) -> str: return " ".join((value or "").split()).strip().lower() local_by_id: Dict[str, Dict[str, str]] = {} local_by_name: Dict[str, Dict[str, str]] = {} local_name_count: Dict[str, int] = {} for row in local: lid = (row.get("id") or "").strip() if lid: local_by_id[lid.lower()] = row name_key = nrm(row.get("name") or "") if name_key: local_name_count[name_key] = local_name_count.get(name_key, 0) + 1 local_by_name[name_key] = row merged: List[StationRecord] = [] for st in cloud: ip = "" local_id = "" for candidate in st.get("_match_ids") or []: item = local_by_id.get(str(candidate).strip().lower()) if item: ip = str(item.get("ip") or "").strip() local_id = str(item.get("id") or "").strip() break if not ip: name_key = nrm(str(st.get("name") or "")) if name_key and local_name_count.get(name_key) == 1: item = local_by_name.get(name_key) or {} ip = str(item.get("ip") or "").strip() local_id = str(item.get("id") or "").strip() if not ip and len(local) == 1: ip = str(local[0].get("ip") or "").strip() local_id = str(local[0].get("id") or "").strip() con_url = ip if ip else "" merged.append( StationRecord( name=str(st.get("name") or "").strip(), station_id=str(st.get("id") or "").strip(), home=str(st.get("home") or "").strip(), room=str(st.get("room") or "").strip(), model=normalize_model(str(st.get("model") or "")), con_url=con_url, local_id=local_id, ) ) return merged def serialize_stations(stations: List[StationRecord]) -> List[Dict[str, str]]: return [ { "name": s.name, "id": s.station_id, "home": s.home, "room": s.room, "model": s.model, "con_url": s.con_url, "ip": s.con_url, "local_id": s.local_id, } for s in stations ] def load_stations_records() -> List[StationRecord]: rows = load_json(STATIONS_PATH, []) out: List[StationRecord] = [] for row in rows: out.append( StationRecord( name=str(row.get("name") or "").strip(), station_id=str(row.get("id") or row.get("station_id") or "").strip(), home=str(row.get("home") or "").strip(), room=str(row.get("room") or "").strip(), model=str(row.get("model") or "").strip(), con_url=str(row.get("con_url") or row.get("ip") or "").strip(), local_id=str(row.get("local_id") or "").strip(), ) ) return [s for s in out if s.station_id] def build_vo_xml(controller_host: str, station_name: str, selector: str) -> str: base = f"http://{controller_host}:9124" from urllib.parse import quote qs = quote(selector, safe="") def mk(title: str, path: str, analog: bool = False) -> str: analog_val = "true" if analog else "false" return ( f'' ) rows = [ mk("TTS", f"/api/exec?action=tts&station={qs}&text=&volume=35", True), mk("Command", f"/api/exec?action=command&station={qs}&text=", True), mk("Play", f"/api/exec?action=player&station={qs}&cmd=play"), mk("Pause", f"/api/exec?action=player&station={qs}&cmd=pause"), mk("Stop", f"/api/exec?action=player&station={qs}&cmd=stop"), mk("Next", f"/api/exec?action=player&station={qs}&cmd=next"), mk("Prev", f"/api/exec?action=player&station={qs}&cmd=prev"), mk("Volume", f"/api/exec?action=volume&station={qs}&level=", True), mk("Raw", f"/api/exec?action=raw&station={qs}&payload=", True), ] xml = [''] xml.append( f'' ) xml.append('\t') xml.extend(["\t" + row for row in rows]) xml.append("") return "\n".join(xml) + "\n" def build_vi_xml(controller_host: str, station_name: str, selector: str) -> str: from urllib.parse import quote addr = f"http://{controller_host}:9124/api/status?station={quote(selector, safe='')}" title = html.escape(f"Alice status - {station_name}") lines = [ '', f'', '\t', '\t', '\t', '\t', '\t', "", ] return "\n".join(lines) + "\n" def build_wb_rules_js(controller_host: str, station_name: str, selector: str) -> str: device_id = safe_name(f"alice_station_{station_name}".lower().replace("-", "_"), "alice_station") station_json = json.dumps(selector, ensure_ascii=False) title_json = json.dumps(f"Alice - {station_name}", ensure_ascii=False) base_json = json.dumps(f"http://{controller_host}:9124", ensure_ascii=False) device_json = json.dumps(device_id, ensure_ascii=False) return f"""// Auto-generated template for wb-rules // Place this file to /etc/wb-rules/ and restart wb-rules. var STATION_SELECTOR = {station_json}; var API_BASE = {base_json}; var DEVICE_ID = {device_json}; var DEVICE_TITLE = {title_json}; function _apiGet(path, cb) {{ runShellCommand("curl -s '" + API_BASE + path + "'", {{ captureOutput: true, exitCallback: function (exitCode, capturedOutput) {{ if (exitCode !== 0) return cb({{ ok: false, error: "curl_failed" }}); try {{ return cb(JSON.parse(capturedOutput || "{{}}")); }} catch (e) {{ return cb({{ ok: false, error: "bad_json", raw: capturedOutput }}); }} }} }}); }} function _enc(v) {{ return encodeURIComponent(String(v === undefined || v === null ? "" : v)); }} defineVirtualDevice(DEVICE_ID, {{ title: DEVICE_TITLE, cells: {{ daemon_ok: {{ title: {{ en: "Daemon OK", ru: "Демон OK" }}, type: "switch", value: false, readonly: true, order: 10 }}, ws_running: {{ title: {{ en: "WS Running", ru: "WS к колонке" }}, type: "switch", value: false, readonly: true, order: 20 }}, playing: {{ title: {{ en: "Playing", ru: "Воспроизведение" }}, type: "switch", value: false, readonly: true, order: 30 }}, current_volume: {{ title: {{ en: "Current Volume", ru: "Текущая громкость" }}, type: "value", value: 0, readonly: true, order: 40 }}, tts_text: {{ title: {{ en: "TTS Text", ru: "Текст TTS" }}, type: "text", value: "", readonly: false, order: 100 }}, tts_send: {{ title: {{ en: "Send TTS", ru: "Отправить TTS" }}, type: "pushbutton", value: false, order: 110 }}, command_text: {{ title: {{ en: "Command Text", ru: "Текст команды" }}, type: "text", value: "", readonly: false, order: 120 }}, command_send: {{ title: {{ en: "Send Command", ru: "Отправить команду" }}, type: "pushbutton", value: false, order: 130 }}, volume: {{ title: {{ en: "Volume", ru: "Громкость" }}, type: "range", value: 35, min: 0, max: 100, order: 200 }}, raw_json: {{ title: {{ en: "Raw JSON", ru: "RAW JSON" }}, type: "text", value: "{{\\"command\\":\\"stop\\"}}", readonly: false, order: 220 }}, raw_send: {{ title: {{ en: "Send RAW", ru: "Отправить RAW" }}, type: "pushbutton", value: false, order: 230 }}, play: {{ title: {{ en: "Play", ru: "Play" }}, type: "pushbutton", value: false, order: 300 }}, pause: {{ title: {{ en: "Pause", ru: "Pause" }}, type: "pushbutton", value: false, order: 310 }}, stop: {{ title: {{ en: "Stop", ru: "Stop" }}, type: "pushbutton", value: false, order: 320 }}, next: {{ title: {{ en: "Next", ru: "Next" }}, type: "pushbutton", value: false, order: 330 }}, prev: {{ title: {{ en: "Prev", ru: "Prev" }}, type: "pushbutton", value: false, order: 340 }}, status_json: {{ title: {{ en: "Status JSON", ru: "Статус JSON" }}, type: "text", value: "{{}}", readonly: true, order: 400 }} }} }}); defineRule(DEVICE_ID + "_tts_send", {{ whenChanged: DEVICE_ID + "/tts_send", then: function (newValue) {{ if (!newValue) return; var txt = String(dev[DEVICE_ID + "/tts_text"] || "").trim(); if (!txt) return; _apiGet("/api/exec?action=tts&station=" + _enc(STATION_SELECTOR) + "&text=" + _enc(txt) + "&volume=35", function(){{}}); }} }}); defineRule(DEVICE_ID + "_command_send", {{ whenChanged: DEVICE_ID + "/command_send", then: function (newValue) {{ if (!newValue) return; var txt = String(dev[DEVICE_ID + "/command_text"] || "").trim(); if (!txt) return; _apiGet("/api/exec?action=command&station=" + _enc(STATION_SELECTOR) + "&text=" + _enc(txt), function(){{}}); }} }}); defineRule(DEVICE_ID + "_volume_control", {{ whenChanged: DEVICE_ID + "/volume", then: function () {{ var level = parseInt(dev[DEVICE_ID + "/volume"], 10); if (isNaN(level)) level = 35; if (level < 0) level = 0; if (level > 100) level = 100; _apiGet("/api/exec?action=volume&station=" + _enc(STATION_SELECTOR) + "&level=" + _enc(level), function(){{}}); }} }}); function _player(cmd) {{ return function (newValue) {{ if (!newValue) return; _apiGet("/api/exec?action=player&station=" + _enc(STATION_SELECTOR) + "&cmd=" + _enc(cmd), function(){{}}); }}; }} defineRule(DEVICE_ID + "_play", {{ whenChanged: DEVICE_ID + "/play", then: _player("play") }}); defineRule(DEVICE_ID + "_pause", {{ whenChanged: DEVICE_ID + "/pause", then: _player("pause") }}); defineRule(DEVICE_ID + "_stop", {{ whenChanged: DEVICE_ID + "/stop", then: _player("stop") }}); defineRule(DEVICE_ID + "_next", {{ whenChanged: DEVICE_ID + "/next", then: _player("next") }}); defineRule(DEVICE_ID + "_prev", {{ whenChanged: DEVICE_ID + "/prev", then: _player("prev") }}); defineRule(DEVICE_ID + "_raw_send", {{ whenChanged: DEVICE_ID + "/raw_send", then: function (newValue) {{ if (!newValue) return; var raw = String(dev[DEVICE_ID + "/raw_json"] || "").trim(); if (!raw) return; _apiGet("/api/exec?action=raw&station=" + _enc(STATION_SELECTOR) + "&payload=" + _enc(raw), function(){{}}); }} }}); defineRule(DEVICE_ID + "_status_poll", {{ when: cron("@every 10s"), then: function () {{ _apiGet("/api/status?station=" + _enc(STATION_SELECTOR), function (j) {{ var ok = Number(j && j.ok ? 1 : 0); var running = Number(j && j.running ? 1 : 0); dev[DEVICE_ID + "/daemon_ok"] = !!ok; dev[DEVICE_ID + "/ws_running"] = !!running; if (j && j.playing !== undefined && j.playing !== null) dev[DEVICE_ID + "/playing"] = !!Number(j.playing ? 1 : 0); if (j && j.volume !== undefined && j.volume !== null) dev[DEVICE_ID + "/current_volume"] = Number(j.volume) || 0; dev[DEVICE_ID + "/status_json"] = JSON.stringify(j || {{}}); }}); }} }}); """ def write_station_templates(station: StationRecord, controller_host: str, target_dir: Path) -> Dict[str, str]: station_folder = target_dir / safe_name(station.name or station.station_id, station.station_id or "station") station_folder.mkdir(parents=True, exist_ok=True) selector = station.selector() vo_path = station_folder / "VO.xml" vi_path = station_folder / "VI.xml" wb_path = station_folder / "wb-rules-station.js" readme_path = station_folder / "README.md" vo_path.write_text(build_vo_xml(controller_host, station.name or station.station_id, selector), encoding="utf-8") vi_path.write_text(build_vi_xml(controller_host, station.name or station.station_id, selector), encoding="utf-8") wb_path.write_text(build_wb_rules_js(controller_host, station.name or station.station_id, selector), encoding="utf-8") readme_path.write_text( "# Alice templates\n\n" f"- Station: {station.name}\n" f"- Station ID: {station.station_id}\n" f"- Selector: {selector}\n" f"- API: http://{controller_host}:9124\n", encoding="utf-8", ) return { "station": station.station_id, "name": station.name, "folder": str(station_folder.relative_to(BASE_DIR)), "vo": str(vo_path.relative_to(BASE_DIR)), "vi": str(vi_path.relative_to(BASE_DIR)), "wb": str(wb_path.relative_to(BASE_DIR)), } def generate_templates(template_kind: str, controller_host: str) -> Dict[str, Any]: stations = load_stations_records() if not stations: raise RuntimeError("stations.json is empty, run station refresh first") ensure_dirs() result: List[Dict[str, str]] = [] for station in stations: files = write_station_templates(station, controller_host, LOXONE_DIR) if template_kind == "loxone": files = {k: v for k, v in files.items() if k in {"station", "name", "folder", "vo", "vi"}} elif template_kind == "wb-rules": files = {k: v for k, v in files.items() if k in {"station", "name", "folder", "wb"}} result.append(files) zip_name = "loxone_templates.zip" if template_kind == "loxone" else "wb_rules_templates.zip" zip_path = LOXONE_DIR / zip_name with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for station in stations: folder = LOXONE_DIR / safe_name(station.name or station.station_id, station.station_id or "station") if template_kind in ("loxone", "all"): for fn in ["VO.xml", "VI.xml", "README.md"]: p = folder / fn if p.exists(): zf.write(p, arcname=f"{folder.name}/{fn}") if template_kind in ("wb-rules", "all"): p = folder / "wb-rules-station.js" if p.exists(): zf.write(p, arcname=f"{folder.name}/wb-rules-station.js") return {"ok": True, "templates": result, "zip": str(zip_path.relative_to(BASE_DIR))} def is_wiren_board() -> bool: return Path("/etc/wb-release").exists() or Path("/etc/wb-rules").exists() def install_wb_rules(station_id: Optional[str] = None) -> Dict[str, Any]: target_dir = Path("/etc/wb-rules") if not target_dir.exists(): raise RuntimeError("/etc/wb-rules not found") stations = load_stations_records() if station_id: stations = [s for s in stations if s.station_id == station_id] if not stations: raise RuntimeError(f"station not found: {station_id}") installed: List[str] = [] for station in stations: folder = LOXONE_DIR / safe_name(station.name or station.station_id, station.station_id or "station") src = folder / "wb-rules-station.js" if not src.exists(): write_station_templates(station, detect_controller_host(), LOXONE_DIR) src = folder / "wb-rules-station.js" dst = target_dir / f"alice_station_{safe_name(station.station_id, 'station')}.js" try: shutil.copy2(src, dst) except PermissionError: subprocess.run(["sudo", "-n", "install", "-m", "0644", str(src), str(dst)], check=True) installed.append(str(dst)) try: subprocess.run(["systemctl", "restart", "wb-rules"], check=True) except Exception: subprocess.run(["sudo", "-n", "systemctl", "restart", "wb-rules"], check=True) return {"ok": True, "installed": installed} async def refresh_stations(token: str, timeout: float = 3.0) -> Dict[str, Any]: token = token.strip() if not token: raise RuntimeError("token is required") cloud = await fetch_cloud_stations(token) local = await discover_local(timeout=timeout) merged = merge_cloud_with_local(cloud, local) payload = serialize_stations(merged) save_json(STATIONS_PATH, payload) return {"ok": True, "stations": payload, "cloud_total": len(cloud), "local_total": len(local)} async def handle_index(_: web.Request) -> web.Response: return web.FileResponse(BASE_DIR / "web" / "index.html") async def handle_static(request: web.Request) -> web.Response: name = request.match_info.get("name", "") p = BASE_DIR / "web" / name if not p.exists() or not p.is_file(): raise web.HTTPNotFound() return web.FileResponse(p) async def api_status(_: web.Request) -> web.Response: stations = load_json(STATIONS_PATH, []) cfg = load_json(CONFIG_PATH, {}) return web.json_response( { "ok": True, "token_set": bool(read_token("")), "controller_host": detect_controller_host(), "stations": stations, "is_wiren_board": is_wiren_board(), "platform": platform.platform(), "config": {"controller_host": str(cfg.get("controller_host") or "")}, } ) async def api_set_token(request: web.Request) -> web.Response: body = await request.json() token = str(body.get("token") or "").strip() if not token: return web.json_response({"ok": False, "error": "token is required"}, status=422) save_token(token) return web.json_response({"ok": True}) async def api_set_host(request: web.Request) -> web.Response: body = await request.json() host = str(body.get("controller_host") or "").strip() if not host: return web.json_response({"ok": False, "error": "controller_host is required"}, status=422) save_controller_host(host) return web.json_response({"ok": True, "controller_host": detect_controller_host()}) async def api_refresh(_: web.Request) -> web.Response: try: out = await refresh_stations(read_token("")) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def api_templates_loxone(_: web.Request) -> web.Response: try: out = generate_templates("loxone", detect_controller_host()) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def api_templates_wb(_: web.Request) -> web.Response: try: out = generate_templates("wb-rules", detect_controller_host()) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def api_download(request: web.Request) -> web.StreamResponse: kind = request.match_info.get("kind", "") if kind not in {"loxone", "wb-rules"}: raise web.HTTPNotFound() zip_name = "loxone_templates.zip" if kind == "loxone" else "wb_rules_templates.zip" zip_path = LOXONE_DIR / zip_name if not zip_path.exists(): generate_templates(kind, detect_controller_host()) return web.FileResponse(zip_path) async def api_install_wb(request: web.Request) -> web.Response: if not is_wiren_board(): return web.json_response({"ok": False, "error": "not a Wiren Board environment"}, status=422) try: body = await request.json() except Exception: body = {} station_id = str(body.get("station_id") or "").strip() or None try: out = install_wb_rules(station_id) return web.json_response(out) except Exception as exc: return web.json_response({"ok": False, "error": str(exc)}, status=422) async def run_web(host: str, port: int) -> int: app = web.Application() app.router.add_get("/", handle_index) app.router.add_get("/web/{name}", handle_static) app.router.add_get("/api/status", api_status) app.router.add_post("/api/token", api_set_token) app.router.add_post("/api/controller-host", api_set_host) app.router.add_post("/api/stations/refresh", api_refresh) app.router.add_post("/api/templates/loxone", api_templates_loxone) app.router.add_post("/api/templates/wb-rules", api_templates_wb) app.router.add_get("/api/download/{kind}", api_download) app.router.add_post("/api/wb-rules/install", api_install_wb) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, host=host, port=port) await site.start() print(f"alice plugin web listening on http://{host}:{port}") while True: await asyncio.sleep(3600) def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="Standalone Alice stations plugin") p.add_argument("--token", default="", help="Yandex OAuth token") sub = p.add_subparsers(dest="cmd", required=True) sp = sub.add_parser("stations-refresh", help="1) get stations list") sp.add_argument("--timeout", type=float, default=3.0) sp = sub.add_parser("templates-loxone", help="2) generate Loxone templates in ./loxone") sp.add_argument("--controller-host", default="") sp = sub.add_parser("templates-wb-rules", help="3) generate wb-rules templates in ./loxone") sp.add_argument("--controller-host", default="") sp = sub.add_parser("wb-install", help="install wb-rules templates to /etc/wb-rules") sp.add_argument("--station-id", default="") sp = sub.add_parser("web", help="run web UI") sp.add_argument("--host", default="0.0.0.0") sp.add_argument("--port", type=int, default=9140) return p async def main_async() -> int: ensure_dirs() parser = build_parser() args = parser.parse_args() if args.cmd == "stations-refresh": token = read_token(args.token) out = await refresh_stations(token, timeout=args.timeout) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "templates-loxone": host = (args.controller_host or "").strip() or detect_controller_host() out = generate_templates("loxone", host) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "templates-wb-rules": host = (args.controller_host or "").strip() or detect_controller_host() out = generate_templates("wb-rules", host) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "wb-install": out = install_wb_rules(args.station_id.strip() or None) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if args.cmd == "web": token = (args.token or "").strip() if token: save_token(token) return await run_web(args.host, args.port) parser.print_help() return 1 def main() -> int: try: return asyncio.run(main_async()) except KeyboardInterrupt: return 130 if __name__ == "__main__": raise SystemExit(main())