#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Демон управления Яндекс Станциями + TCP/HTTP API. Модуль отвечает за runtime-команды: - поиск станций в локальной сети через mDNS; - открытие websocket к станции и отправку payload-команд; - публикацию API `/api/exec` и `/api/status` для внешней интеграции. """ import argparse import asyncio import html import json import os import ssl import time import uuid from dataclasses import dataclass from typing import Any, Dict, List, Optional import aiohttp from aiohttp import web from zeroconf import ServiceBrowser, Zeroconf MDNS_SERVICE = "_yandexio._tcp.local." QUASAR_TOKEN_URL = "https://quasar.yandex.net/glagol/token" YANDEX_INFO_URL = "https://api.iot.yandex.net/v1.0/user/info" @dataclass class Station: device_id: str platform: str host: str port: int name: str = "" class _Listener: """Слушатель mDNS-событий и сборщик найденных станций.""" def __init__(self): self.stations: Dict[str, Station] = {} def remove_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: pass def add_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: self._handle(zeroconf, service_type, name) def update_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: self._handle(zeroconf, service_type, name) def _handle(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: # Zeroconf возвращает "сырые" свойства как bytes: нормализуем в str, # чтобы дальше всё работало единообразно и без проблем сериализации. info = zeroconf.get_service_info(service_type, name) if not info: return props: Dict[str, Any] = {} for k, v in (info.properties or {}).items(): try: kk = k.decode("utf-8") if isinstance(k, (bytes, bytearray)) else str(k) vv = v.decode("utf-8", errors="ignore") if isinstance(v, (bytes, bytearray)) else v props[kk] = vv except Exception: continue device_id = str(props.get("deviceId") or props.get("device_id") or "").strip() platform = str(props.get("platform") or "").strip() if not device_id or not platform: return try: addr = info.addresses[0] host = ".".join(str(b) for b in addr) except Exception: return port = int(info.port or 0) if not port: return name_friendly = str(props.get("name") or props.get("deviceName") or name).strip() self.stations[device_id] = Station( device_id=device_id, platform=platform, host=host, port=port, name=name_friendly, ) async def discover(timeout_sec: float = 2.5) -> List[Station]: """Ищет станции по mDNS в течение `timeout_sec` и возвращает снимок.""" zc = Zeroconf() listener = _Listener() browser = ServiceBrowser(zc, MDNS_SERVICE, listener) try: await asyncio.sleep(timeout_sec) finally: try: browser.cancel() except Exception: pass try: zc.close() except Exception: pass return list(listener.stations.values()) def _ssl_ctx() -> ssl.SSLContext: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def _volume01_from_0_100(v: float) -> float: if v < 0 or v > 100: raise RuntimeError("volume must be 0..100") return round(v / 100.0, 3) def _speaker_value(text: str, voice: str = "", effect: str = "", audio: str = "") -> str: attrs: List[str] = [] if voice: attrs.append(f"voice='{html.escape(str(voice), quote=True)}'") if effect: attrs.append(f"effect='{html.escape(str(effect), quote=True)}'") if audio: attrs.append(f"audio='{html.escape(str(audio), quote=True)}'") attr_str = (" " + " ".join(attrs)) if attrs else "" return f"{text or ''}" def pick_station(stations: List[Station], selector: str) -> Station: """Выбирает станцию по наиболее частым форматам селектора. Поддерживаем device_id, host, host:port и частичное совпадение имени. """ selector = (selector or "").strip() if not selector: raise RuntimeError("empty selector") for s in stations: if s.device_id == selector: return s for s in stations: if s.host == selector or f"{s.host}:{s.port}" == selector: return s low = selector.lower() for s in stations: if low in (s.name or "").lower(): return s raise RuntimeError(f"station not found by selector: {selector}") async def get_conversation_token(session: aiohttp.ClientSession, oauth_token: str, station: Station) -> str: params = {"device_id": station.device_id, "platform": station.platform} headers = {"Authorization": f"oauth {oauth_token}"} async with session.get(QUASAR_TOKEN_URL, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as r: text = await r.text() try: data = json.loads(text) except Exception: raise RuntimeError(f"Quasar token: bad json: {text[:200]}") if data.get("status") != "ok" or not data.get("token"): raise RuntimeError(f"Quasar token: unexpected response: {data}") return str(data["token"]) async def ws_send(session: aiohttp.ClientSession, station: Station, conv_token: str, payload: dict, wait_sec: float = 5.0) -> dict: url = f"wss://{station.host}:{station.port}" req_id = str(uuid.uuid4()) msg = { "conversationToken": conv_token, "id": req_id, "payload": payload, "sentTime": int(round(time.time() * 1000)), } async with session.ws_connect(url, ssl=_ssl_ctx(), heartbeat=55, timeout=aiohttp.ClientTimeout(total=10)) as ws: await ws.send_json(msg) deadline = time.time() + wait_sec while time.time() < deadline: in_msg = await ws.receive(timeout=wait_sec) if in_msg.type == aiohttp.WSMsgType.TEXT: try: data = json.loads(in_msg.data) except Exception: continue if data.get("requestId") == req_id or data.get("id") == req_id: return data elif in_msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break return {"status": "timeout"} async def do_list(args) -> int: stations = await discover(args.timeout) for s in stations: print(f"{s.name}\tdevice_id={s.device_id}\tplatform={s.platform}\t{s.host}:{s.port}") return 0 async def do_stations_cloud(args) -> int: oauth = args.token or os.environ.get("YANDEX_TOKEN", "").strip() if not oauth: raise RuntimeError("No token. Use --token or env YANDEX_TOKEN") async with aiohttp.ClientSession() as session: headers = {"Authorization": f"OAuth {oauth}"} async with session.get(YANDEX_INFO_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as r: data = await r.json(content_type=None) rooms = {} for rm in (data.get("rooms") or []): rid = str(rm.get("id") or "") if rid: rooms[rid] = str(rm.get("name") or "") out = [] for d in (data.get("devices") or []): tp = str(d.get("type") or "").lower() if "speaker" not in tp and "smart_speaker" not in tp: continue rid = str(d.get("room") or "") out.append({ "name": str(d.get("name") or ""), "id": str(d.get("id") or ""), "room": rooms.get(rid, ""), "model": str(d.get("model") or d.get("type") or ""), }) print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 class StationConn: """Долгоживущее соединение с одной станцией. Что хранит: - websocket и conversation token; - кэш состояния (playing/volume) для быстрых status-ответов; - lock, чтобы параллельные команды не перемешивались в одном ws. """ def __init__(self, station: Station, oauth_token: str, wait_sec: float): self.station = station self.oauth_token = oauth_token self.wait_sec = wait_sec self._session: Optional[aiohttp.ClientSession] = None self._ws: Optional[aiohttp.ClientWebSocketResponse] = None self._conv_token: str = "" self._conv_exp_ts: float = 0.0 self._lock = asyncio.Lock() self._playing: Optional[int] = None self._volume: Optional[int] = None self._last_probe_ts: float = 0.0 @property def playing(self) -> Optional[int]: if self._playing is None: return None return 1 if self._playing else 0 @property def volume(self) -> Optional[int]: if self._volume is None: return None v = int(self._volume) if v < 0: return 0 if v > 100: return 100 return v @staticmethod def _to_playing_flag(value: Any) -> Optional[int]: if isinstance(value, bool): return 1 if value else 0 if isinstance(value, (int, float)): v = int(round(float(value))) if v == 2: return 1 if v in (0, 1): return v if isinstance(value, str): low = value.strip().lower() if low in {"playing", "play", "started", "start", "on", "true", "yes"}: return 1 if low in {"paused", "stopped", "stop", "idle", "off", "false", "no"}: return 0 try: vv = int(float(low)) if vv == 2: return 1 if vv in (0, 1): return vv except Exception: return None return None @classmethod def _infer_playing(cls, obj: Any) -> Optional[int]: if isinstance(obj, dict): for k in ("playing", "isPlaying", "playState", "playerState", "state"): if k in obj: out = cls._to_playing_flag(obj.get(k)) if out is not None: return out for v in obj.values(): out = cls._infer_playing(v) if out is not None: return out elif isinstance(obj, list): for v in obj: out = cls._infer_playing(v) if out is not None: return out return None @staticmethod def _to_volume_percent(value: Any) -> Optional[int]: if isinstance(value, bool): return 100 if value else 0 if isinstance(value, (int, float)): v = float(value) if 0.0 <= v <= 1.0: v = v * 100.0 iv = int(round(v)) if iv < 0: iv = 0 if iv > 100: iv = 100 return iv if isinstance(value, str): s = value.strip().replace(",", ".") if not s: return None try: return StationConn._to_volume_percent(float(s)) except Exception: return None return None @classmethod def _infer_volume(cls, obj: Any) -> Optional[int]: if isinstance(obj, dict): for k in ("volume", "currentVolume", "volumeLevel", "level"): if k in obj: out = cls._to_volume_percent(obj.get(k)) if out is not None: return out for v in obj.values(): out = cls._infer_volume(v) if out is not None: return out elif isinstance(obj, list): for v in obj: out = cls._infer_volume(v) if out is not None: return out return None async def start(self): if self._session is None: self._session = aiohttp.ClientSession() await self._ensure_ws() async def close(self): try: if self._ws is not None: await self._ws.close() except Exception: pass self._ws = None try: if self._session is not None: await self._session.close() except Exception: pass self._session = None async def _ensure_conv_token(self): # Токен короткоживущий: обновляем заранее, чтобы не ловить отказ # в середине отправки команды. now = time.time() if self._conv_token and self._conv_exp_ts > now: return assert self._session is not None self._conv_token = await get_conversation_token(self._session, self.oauth_token, self.station) self._conv_exp_ts = now + 60.0 async def _ensure_ws(self): assert self._session is not None await self._ensure_conv_token() if self._ws is not None and not self._ws.closed: return url = f"wss://{self.station.host}:{self.station.port}" self._ws = await self._session.ws_connect( url, ssl=_ssl_ctx(), heartbeat=55, timeout=aiohttp.ClientTimeout(total=10), ) async def send(self, payload: dict) -> dict: """Отправляет команду в станцию и ждёт ответ с тем же request id.""" async with self._lock: await self._ensure_ws() req_id = str(uuid.uuid4()) cmd = str(payload.get("command") or "").strip().lower() sent_volume = None if cmd == "setvolume": sent_volume = self._to_volume_percent(payload.get("volume")) if sent_volume is not None: # Optimistic update: status endpoint can immediately expose requested level. self._volume = sent_volume msg = { "conversationToken": self._conv_token, "id": req_id, "payload": payload, "sentTime": int(round(time.time() * 1000)), } try: assert self._ws is not None await self._ws.send_json(msg) except Exception: # WS мог оборваться между командами: делаем один # контролируемый реконнект и повтор отправки. try: if self._ws is not None: await self._ws.close() except Exception: pass self._ws = None await self._ensure_ws() assert self._ws is not None await self._ws.send_json(msg) deadline = time.time() + self.wait_sec while time.time() < deadline: try: in_msg = await self._ws.receive(timeout=self.wait_sec) except Exception: break if in_msg.type == aiohttp.WSMsgType.TEXT: try: data = json.loads(in_msg.data) except Exception: continue if data.get("requestId") == req_id or data.get("id") == req_id: inferred = self._infer_playing(data) if inferred is not None: self._playing = inferred elif cmd == "play": self._playing = 1 elif cmd == "stop": self._playing = 0 inferred_volume = self._infer_volume(data) if inferred_volume is not None: self._volume = inferred_volume elif sent_volume is not None: self._volume = sent_volume return data elif in_msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break if sent_volume is not None: self._volume = sent_volume return {"status": "timeout", "id": req_id} async def probe_state(self) -> None: """Пытается обновить playing/volume через набор безопасных команд статуса.""" now = time.time() if now - self._last_probe_ts < 5.0: return self._last_probe_ts = now candidates = [ {"command": "getState"}, {"command": "getStatus"}, {"command": "status"}, {"command": "playerState"}, ] for payload in candidates: try: data = await self.send(payload) except Exception: continue if self._infer_playing(data) is not None or self._infer_volume(data) is not None: break async def serve_daemon(args) -> int: """Запускает основной daemon: TCP-сервер + HTTP API.""" oauth = args.token or os.environ.get("YANDEX_TOKEN", "").strip() if not oauth: raise RuntimeError("No token. Use --token or env YANDEX_TOKEN") stations = await discover(args.timeout) selected: List[Station] = [] if args.stations: for sel in args.stations.split(","): selected.append(pick_station(stations, sel.strip())) else: selected = stations if not selected: raise RuntimeError("No stations found") conns: Dict[str, StationConn] = {} try: for st in selected: conns[st.device_id] = StationConn(st, oauth, args.wait) await conns[st.device_id].start() except Exception as exc: for c in conns.values(): try: await c.close() except Exception: pass msg = str(exc) if "AUTH_TOKEN_INVALID" in msg: raise RuntimeError("YANDEX_OAUTH_TOKEN invalid: update token and retry") from exc raise async def resolve_conn(station_sel: str) -> Optional[StationConn]: """Находит активное соединение станции по селектору пользователя.""" station_sel = (station_sel or "").strip() if not station_sel: return None conn = conns.get(station_sel) if conn is not None: return conn low = station_sel.lower() for c in conns.values(): if c.station.device_id == station_sel or low in (c.station.name or "").lower(): return c if c.station.host == station_sel or f"{c.station.host}:{c.station.port}" == station_sel: return c return None async def execute_action(req: Dict[str, Any]) -> Dict[str, Any]: """Валидация входного запроса и преобразование в payload станции.""" station_sel = str(req.get("station") or "").strip() if not station_sel: return {"ok": False, "error": "station is required"} action = str(req.get("action") or "").strip().lower() conn = await resolve_conn(station_sel) if conn is None: return {"ok": False, "error": "station not found"} try: if action == "tts": text = str(req.get("text") or "").strip() tts_voice = str(req.get("tts_voice") or "").strip() tts_effect = str(req.get("tts_effect") or "").strip() tts_audio = str(req.get("tts_audio") or "").strip() if tts_audio: tts_audio = os.path.basename(tts_audio.replace("\\", "/")) if not text and not tts_audio: raise RuntimeError("text or tts_audio is required") volume = req.get("volume") if volume is not None and str(volume).strip() != "": vol_payload = {"command": "setVolume", "volume": _volume01_from_0_100(float(volume))} await conn.send(vol_payload) speaker_value = _speaker_value(text, voice=tts_voice, effect=tts_effect, audio=tts_audio) payload = { "command": "serverAction", "serverActionEventPayload": { "type": "server_action", "name": "update_form", "payload": { "form_update": { "name": "personal_assistant.scenarios.repeat_after_me", "slots": [{"type": "string", "name": "request", "value": speaker_value}], }, "resubmit": True, }, }, } elif action == "command": text = str(req.get("text") or "").strip() if not text: raise RuntimeError("text is required") payload = {"command": "sendText", "text": text} elif action == "player": cmd = str(req.get("cmd") or "").strip().lower() if cmd not in {"play", "stop", "next", "prev"}: raise RuntimeError("cmd must be play|stop|next|prev") payload = {"command": cmd} elif action == "volume": lvl = float(req.get("level")) payload = {"command": "setVolume", "volume": _volume01_from_0_100(lvl)} elif action == "raw": payload = req.get("payload") if isinstance(payload, list): if not payload: raise RuntimeError("payload list is empty") results = [] for idx, p in enumerate(payload): if not isinstance(p, dict): raise RuntimeError(f"payload[{idx}] must be object") results.append(await conn.send(p)) return {"ok": True, "result": results} if not isinstance(payload, dict): raise RuntimeError("payload must be object or array of objects") else: raise RuntimeError("action must be tts|command|player|volume|raw") result = await conn.send(payload) return {"ok": True, "result": result} except Exception as e: return {"ok": False, "error": str(e)} async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): # TCP-протокол простой: одна JSON-строка на запрос. try: line = await reader.readline() if not line: writer.close() return req = json.loads(line.decode("utf-8", errors="ignore")) except Exception as e: writer.write((json.dumps({"ok": False, "error": f"bad request: {e}"}, ensure_ascii=False) + "\n").encode("utf-8")) await writer.drain() writer.close() return resp = await execute_action(req) writer.write((json.dumps(resp, ensure_ascii=False) + "\n").encode("utf-8")) await writer.drain() writer.close() server = await asyncio.start_server(handle_client, host=args.host, port=args.port) addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets or []) print(f"tcp server listening on {addrs}") app = web.Application() async def http_exec(request: web.Request) -> web.Response: """HTTP-обёртка над execute_action: query/form/json -> единый dict.""" data: Dict[str, Any] = {} if request.method == "POST": if request.content_type and "application/json" in request.content_type: body = await request.json() if isinstance(body, dict): data.update(body) else: body = await request.post() data.update({k: v for k, v in body.items()}) data.update({k: v for k, v in request.query.items()}) if "payload" in data and isinstance(data["payload"], str): payload_raw = str(data["payload"]).strip() if payload_raw: try: data["payload"] = json.loads(payload_raw) except Exception: return web.json_response({"ok": False, "error": "bad payload json"}, status=400) result = await execute_action(data) return web.json_response(result, status=200 if result.get("ok") else 422) async def http_status(request: web.Request) -> web.Response: """Возвращает текущее состояние соединения и кэш плеера/громкости.""" station_sel = str(request.query.get("station") or "").strip() conn = await resolve_conn(station_sel) if conn is None: return web.json_response({"ok": False, "error": "station not found"}, status=404) # Keep persistent WS connection: try to restore it on each status poll. try: await conn.start() except Exception: pass try: await conn.probe_state() except Exception: pass running = 1 if (conn._ws is not None and not conn._ws.closed) else 0 return web.json_response({ "ok": 1, "running": running, "playing": conn.playing, "volume": conn.volume, "station": conn.station.device_id, }) async def http_health(_request: web.Request) -> web.Response: return web.json_response({"ok": 1}) app.router.add_get("/health", http_health) app.router.add_get("/api/status", http_status) app.router.add_get("/api/exec", http_exec) app.router.add_post("/api/exec", http_exec) http_runner = web.AppRunner(app) await http_runner.setup() http_site = web.TCPSite(http_runner, host=args.http_host, port=args.http_port) await http_site.start() print(f"http server listening on {(args.http_host, args.http_port)}") try: async with server: await server.serve_forever() finally: try: await http_runner.cleanup() except Exception: pass for c in conns.values(): await c.close() async def client_call(args) -> int: req = {"station": args.station, "action": args.action} if args.action in {"tts", "command"}: req["text"] = args.text elif args.action == "player": req["cmd"] = args.cmd elif args.action == "volume": req["level"] = float(args.level) elif args.action == "raw": req["payload"] = json.loads(args.payload) reader, writer = await asyncio.open_connection(args.host, args.port) writer.write((json.dumps(req, ensure_ascii=False) + "\n").encode("utf-8")) await writer.drain() line = await reader.readline() writer.close() if not line: print(json.dumps({"ok": False, "error": "no response"}, ensure_ascii=False)) return 2 print(line.decode("utf-8", errors="ignore").rstrip("\n")) return 0 def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(prog="yastation.py", description="Yandex station helper") p.add_argument("--timeout", type=float, default=2.5) p.add_argument("--token", type=str, default="") p.add_argument("--wait", type=float, default=5.0) sub = p.add_subparsers(dest="action", required=True) sp = sub.add_parser("list") sp.set_defaults(fn=do_list) sp = sub.add_parser("stations-cloud") sp.set_defaults(fn=do_stations_cloud) sp = sub.add_parser("serve") sp.add_argument("--host", default="127.0.0.1") sp.add_argument("--port", type=int, default=9123) sp.add_argument("--http-host", default="0.0.0.0") sp.add_argument("--http-port", type=int, default=9124) sp.add_argument("--stations", default="") sp.set_defaults(fn=serve_daemon) sp = sub.add_parser("client") sp.add_argument("--host", default="127.0.0.1") sp.add_argument("--port", type=int, default=9123) sp.add_argument("--station", required=True) sp.add_argument("action", help="tts|command|player|volume|raw") sp.add_argument("--text", default="") sp.add_argument("--cmd", default="") sp.add_argument("--level", default="0") sp.add_argument("--payload", default="{}") sp.set_defaults(fn=client_call) return p def main() -> int: parser = build_parser() args = parser.parse_args() try: return asyncio.run(args.fn(args)) except KeyboardInterrupt: return 130 if __name__ == "__main__": raise SystemExit(main())