Files
local_yandex_station/yastation.py

802 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Демон управления Яндекс Станциями + TCP/HTTP API.
Модуль отвечает за runtime-команды:
- поиск станций в локальной сети через mDNS;
- открытие websocket к станции и отправку payload-команд;
- публикацию API `/api/exec` и `/api/status` для внешней интеграции.
"""
import argparse
import asyncio
import html
import json
import os
import ssl
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import aiohttp
from aiohttp import web
from zeroconf import ServiceBrowser, Zeroconf
MDNS_SERVICE = "_yandexio._tcp.local."
QUASAR_TOKEN_URL = "https://quasar.yandex.net/glagol/token"
YANDEX_INFO_URL = "https://api.iot.yandex.net/v1.0/user/info"
@dataclass
class Station:
device_id: str
platform: str
host: str
port: int
name: str = ""
class _Listener:
"""Слушатель mDNS-событий и сборщик найденных станций."""
def __init__(self):
self.stations: Dict[str, Station] = {}
def remove_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
pass
def add_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
self._handle(zeroconf, service_type, name)
def update_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
self._handle(zeroconf, service_type, name)
def _handle(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
# Zeroconf возвращает "сырые" свойства как bytes: нормализуем в str,
# чтобы дальше всё работало единообразно и без проблем сериализации.
info = zeroconf.get_service_info(service_type, name)
if not info:
return
props: Dict[str, Any] = {}
for k, v in (info.properties or {}).items():
try:
kk = k.decode("utf-8") if isinstance(k, (bytes, bytearray)) else str(k)
vv = v.decode("utf-8", errors="ignore") if isinstance(v, (bytes, bytearray)) else v
props[kk] = vv
except Exception:
continue
device_id = str(props.get("deviceId") or props.get("device_id") or "").strip()
platform = str(props.get("platform") or "").strip()
if not device_id or not platform:
return
try:
addr = info.addresses[0]
host = ".".join(str(b) for b in addr)
except Exception:
return
port = int(info.port or 0)
if not port:
return
name_friendly = str(props.get("name") or props.get("deviceName") or name).strip()
self.stations[device_id] = Station(
device_id=device_id,
platform=platform,
host=host,
port=port,
name=name_friendly,
)
async def discover(timeout_sec: float = 2.5) -> List[Station]:
"""Ищет станции по mDNS в течение `timeout_sec` и возвращает снимок."""
zc = Zeroconf()
listener = _Listener()
browser = ServiceBrowser(zc, MDNS_SERVICE, listener)
try:
await asyncio.sleep(timeout_sec)
finally:
try:
browser.cancel()
except Exception:
pass
try:
zc.close()
except Exception:
pass
return list(listener.stations.values())
def _ssl_ctx() -> ssl.SSLContext:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
def _volume01_from_0_100(v: float) -> float:
if v < 0 or v > 100:
raise RuntimeError("volume must be 0..100")
return round(v / 100.0, 3)
def _speaker_value(text: str, voice: str = "", effect: str = "", audio: str = "") -> str:
attrs: List[str] = []
if voice:
attrs.append(f"voice='{html.escape(str(voice), quote=True)}'")
if effect:
attrs.append(f"effect='{html.escape(str(effect), quote=True)}'")
if audio:
attrs.append(f"audio='{html.escape(str(audio), quote=True)}'")
attr_str = (" " + " ".join(attrs)) if attrs else ""
return f"<speaker{attr_str}>{text or ''}"
def pick_station(stations: List[Station], selector: str) -> Station:
"""Выбирает станцию по наиболее частым форматам селектора.
Поддерживаем device_id, host, host:port и частичное совпадение имени.
"""
selector = (selector or "").strip()
if not selector:
raise RuntimeError("empty selector")
for s in stations:
if s.device_id == selector:
return s
for s in stations:
if s.host == selector or f"{s.host}:{s.port}" == selector:
return s
low = selector.lower()
for s in stations:
if low in (s.name or "").lower():
return s
raise RuntimeError(f"station not found by selector: {selector}")
async def get_conversation_token(session: aiohttp.ClientSession, oauth_token: str, station: Station) -> str:
params = {"device_id": station.device_id, "platform": station.platform}
headers = {"Authorization": f"oauth {oauth_token}"}
async with session.get(QUASAR_TOKEN_URL, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as r:
text = await r.text()
try:
data = json.loads(text)
except Exception:
raise RuntimeError(f"Quasar token: bad json: {text[:200]}")
if data.get("status") != "ok" or not data.get("token"):
raise RuntimeError(f"Quasar token: unexpected response: {data}")
return str(data["token"])
async def ws_send(session: aiohttp.ClientSession, station: Station, conv_token: str, payload: dict, wait_sec: float = 5.0) -> dict:
url = f"wss://{station.host}:{station.port}"
req_id = str(uuid.uuid4())
msg = {
"conversationToken": conv_token,
"id": req_id,
"payload": payload,
"sentTime": int(round(time.time() * 1000)),
}
async with session.ws_connect(url, ssl=_ssl_ctx(), heartbeat=55, timeout=aiohttp.ClientTimeout(total=10)) as ws:
await ws.send_json(msg)
deadline = time.time() + wait_sec
while time.time() < deadline:
in_msg = await ws.receive(timeout=wait_sec)
if in_msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(in_msg.data)
except Exception:
continue
if data.get("requestId") == req_id or data.get("id") == req_id:
return data
elif in_msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
return {"status": "timeout"}
async def do_list(args) -> int:
stations = await discover(args.timeout)
for s in stations:
print(f"{s.name}\tdevice_id={s.device_id}\tplatform={s.platform}\t{s.host}:{s.port}")
return 0
async def do_stations_cloud(args) -> int:
oauth = args.token or os.environ.get("YANDEX_TOKEN", "").strip()
if not oauth:
raise RuntimeError("No token. Use --token or env YANDEX_TOKEN")
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"OAuth {oauth}"}
async with session.get(YANDEX_INFO_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as r:
data = await r.json(content_type=None)
rooms = {}
for rm in (data.get("rooms") or []):
rid = str(rm.get("id") or "")
if rid:
rooms[rid] = str(rm.get("name") or "")
out = []
for d in (data.get("devices") or []):
tp = str(d.get("type") or "").lower()
if "speaker" not in tp and "smart_speaker" not in tp:
continue
rid = str(d.get("room") or "")
out.append({
"name": str(d.get("name") or ""),
"id": str(d.get("id") or ""),
"room": rooms.get(rid, ""),
"model": str(d.get("model") or d.get("type") or ""),
})
print(json.dumps(out, ensure_ascii=False, indent=2))
return 0
class StationConn:
"""Долгоживущее соединение с одной станцией.
Что хранит:
- websocket и conversation token;
- кэш состояния (playing/volume) для быстрых status-ответов;
- lock, чтобы параллельные команды не перемешивались в одном ws.
"""
def __init__(self, station: Station, oauth_token: str, wait_sec: float):
self.station = station
self.oauth_token = oauth_token
self.wait_sec = wait_sec
self._session: Optional[aiohttp.ClientSession] = None
self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
self._conv_token: str = ""
self._conv_exp_ts: float = 0.0
self._lock = asyncio.Lock()
self._playing: Optional[int] = None
self._volume: Optional[int] = None
self._last_probe_ts: float = 0.0
@property
def playing(self) -> Optional[int]:
if self._playing is None:
return None
return 1 if self._playing else 0
@property
def volume(self) -> Optional[int]:
if self._volume is None:
return None
v = int(self._volume)
if v < 0:
return 0
if v > 100:
return 100
return v
@staticmethod
def _to_playing_flag(value: Any) -> Optional[int]:
if isinstance(value, bool):
return 1 if value else 0
if isinstance(value, (int, float)):
v = int(round(float(value)))
if v == 2:
return 1
if v in (0, 1):
return v
if isinstance(value, str):
low = value.strip().lower()
if low in {"playing", "play", "started", "start", "on", "true", "yes"}:
return 1
if low in {"paused", "stopped", "stop", "idle", "off", "false", "no"}:
return 0
try:
vv = int(float(low))
if vv == 2:
return 1
if vv in (0, 1):
return vv
except Exception:
return None
return None
@classmethod
def _infer_playing(cls, obj: Any) -> Optional[int]:
if isinstance(obj, dict):
for k in ("playing", "isPlaying", "playState", "playerState", "state"):
if k in obj:
out = cls._to_playing_flag(obj.get(k))
if out is not None:
return out
for v in obj.values():
out = cls._infer_playing(v)
if out is not None:
return out
elif isinstance(obj, list):
for v in obj:
out = cls._infer_playing(v)
if out is not None:
return out
return None
@staticmethod
def _to_volume_percent(value: Any) -> Optional[int]:
if isinstance(value, bool):
return 100 if value else 0
if isinstance(value, (int, float)):
v = float(value)
if 0.0 <= v <= 1.0:
v = v * 100.0
iv = int(round(v))
if iv < 0:
iv = 0
if iv > 100:
iv = 100
return iv
if isinstance(value, str):
s = value.strip().replace(",", ".")
if not s:
return None
try:
return StationConn._to_volume_percent(float(s))
except Exception:
return None
return None
@classmethod
def _infer_volume(cls, obj: Any) -> Optional[int]:
if isinstance(obj, dict):
for k in ("volume", "currentVolume", "volumeLevel", "level"):
if k in obj:
out = cls._to_volume_percent(obj.get(k))
if out is not None:
return out
for v in obj.values():
out = cls._infer_volume(v)
if out is not None:
return out
elif isinstance(obj, list):
for v in obj:
out = cls._infer_volume(v)
if out is not None:
return out
return None
async def start(self):
if self._session is None:
self._session = aiohttp.ClientSession()
await self._ensure_ws()
async def close(self):
try:
if self._ws is not None:
await self._ws.close()
except Exception:
pass
self._ws = None
try:
if self._session is not None:
await self._session.close()
except Exception:
pass
self._session = None
async def _ensure_conv_token(self):
# Токен короткоживущий: обновляем заранее, чтобы не ловить отказ
# в середине отправки команды.
now = time.time()
if self._conv_token and self._conv_exp_ts > now:
return
assert self._session is not None
self._conv_token = await get_conversation_token(self._session, self.oauth_token, self.station)
self._conv_exp_ts = now + 60.0
async def _ensure_ws(self):
assert self._session is not None
await self._ensure_conv_token()
if self._ws is not None and not self._ws.closed:
return
url = f"wss://{self.station.host}:{self.station.port}"
self._ws = await self._session.ws_connect(
url,
ssl=_ssl_ctx(),
heartbeat=55,
timeout=aiohttp.ClientTimeout(total=10),
)
async def send(self, payload: dict) -> dict:
"""Отправляет команду в станцию и ждёт ответ с тем же request id."""
async with self._lock:
await self._ensure_ws()
req_id = str(uuid.uuid4())
cmd = str(payload.get("command") or "").strip().lower()
sent_volume = None
if cmd == "setvolume":
sent_volume = self._to_volume_percent(payload.get("volume"))
if sent_volume is not None:
# Optimistic update: status endpoint can immediately expose requested level.
self._volume = sent_volume
msg = {
"conversationToken": self._conv_token,
"id": req_id,
"payload": payload,
"sentTime": int(round(time.time() * 1000)),
}
try:
assert self._ws is not None
await self._ws.send_json(msg)
except Exception:
# WS мог оборваться между командами: делаем один
# контролируемый реконнект и повтор отправки.
try:
if self._ws is not None:
await self._ws.close()
except Exception:
pass
self._ws = None
await self._ensure_ws()
assert self._ws is not None
await self._ws.send_json(msg)
deadline = time.time() + self.wait_sec
while time.time() < deadline:
try:
in_msg = await self._ws.receive(timeout=self.wait_sec)
except Exception:
break
if in_msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(in_msg.data)
except Exception:
continue
if data.get("requestId") == req_id or data.get("id") == req_id:
inferred = self._infer_playing(data)
if inferred is not None:
self._playing = inferred
elif cmd == "play":
self._playing = 1
elif cmd == "stop":
self._playing = 0
inferred_volume = self._infer_volume(data)
if inferred_volume is not None:
self._volume = inferred_volume
elif sent_volume is not None:
self._volume = sent_volume
return data
elif in_msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
if sent_volume is not None:
self._volume = sent_volume
return {"status": "timeout", "id": req_id}
async def probe_state(self) -> None:
"""Пытается обновить playing/volume через набор безопасных команд статуса."""
now = time.time()
if now - self._last_probe_ts < 5.0:
return
self._last_probe_ts = now
candidates = [
{"command": "getState"},
{"command": "getStatus"},
{"command": "status"},
{"command": "playerState"},
]
for payload in candidates:
try:
data = await self.send(payload)
except Exception:
continue
if self._infer_playing(data) is not None or self._infer_volume(data) is not None:
break
async def serve_daemon(args) -> int:
"""Запускает основной daemon: TCP-сервер + HTTP API."""
oauth = args.token or os.environ.get("YANDEX_TOKEN", "").strip()
if not oauth:
raise RuntimeError("No token. Use --token or env YANDEX_TOKEN")
stations = await discover(args.timeout)
selected: List[Station] = []
if args.stations:
for sel in args.stations.split(","):
selected.append(pick_station(stations, sel.strip()))
else:
selected = stations
if not selected:
raise RuntimeError("No stations found")
conns: Dict[str, StationConn] = {}
try:
for st in selected:
conns[st.device_id] = StationConn(st, oauth, args.wait)
await conns[st.device_id].start()
except Exception as exc:
for c in conns.values():
try:
await c.close()
except Exception:
pass
msg = str(exc)
if "AUTH_TOKEN_INVALID" in msg:
raise RuntimeError("YANDEX_OAUTH_TOKEN invalid: update token and retry") from exc
raise
async def resolve_conn(station_sel: str) -> Optional[StationConn]:
"""Находит активное соединение станции по селектору пользователя."""
station_sel = (station_sel or "").strip()
if not station_sel:
return None
conn = conns.get(station_sel)
if conn is not None:
return conn
low = station_sel.lower()
for c in conns.values():
if c.station.device_id == station_sel or low in (c.station.name or "").lower():
return c
if c.station.host == station_sel or f"{c.station.host}:{c.station.port}" == station_sel:
return c
return None
async def execute_action(req: Dict[str, Any]) -> Dict[str, Any]:
"""Валидация входного запроса и преобразование в payload станции."""
station_sel = str(req.get("station") or "").strip()
if not station_sel:
return {"ok": False, "error": "station is required"}
action = str(req.get("action") or "").strip().lower()
conn = await resolve_conn(station_sel)
if conn is None:
return {"ok": False, "error": "station not found"}
try:
if action == "tts":
text = str(req.get("text") or "").strip()
tts_voice = str(req.get("tts_voice") or "").strip()
tts_effect = str(req.get("tts_effect") or "").strip()
tts_audio = str(req.get("tts_audio") or "").strip()
if tts_audio:
tts_audio = os.path.basename(tts_audio.replace("\\", "/"))
if not text and not tts_audio:
raise RuntimeError("text or tts_audio is required")
volume = req.get("volume")
if volume is not None and str(volume).strip() != "":
vol_payload = {"command": "setVolume", "volume": _volume01_from_0_100(float(volume))}
await conn.send(vol_payload)
speaker_value = _speaker_value(text, voice=tts_voice, effect=tts_effect, audio=tts_audio)
payload = {
"command": "serverAction",
"serverActionEventPayload": {
"type": "server_action",
"name": "update_form",
"payload": {
"form_update": {
"name": "personal_assistant.scenarios.repeat_after_me",
"slots": [{"type": "string", "name": "request", "value": speaker_value}],
},
"resubmit": True,
},
},
}
elif action == "command":
text = str(req.get("text") or "").strip()
if not text:
raise RuntimeError("text is required")
payload = {"command": "sendText", "text": text}
elif action == "player":
cmd = str(req.get("cmd") or "").strip().lower()
if cmd not in {"play", "stop", "next", "prev"}:
raise RuntimeError("cmd must be play|stop|next|prev")
payload = {"command": cmd}
elif action == "volume":
lvl = float(req.get("level"))
payload = {"command": "setVolume", "volume": _volume01_from_0_100(lvl)}
elif action == "raw":
payload = req.get("payload")
if isinstance(payload, list):
if not payload:
raise RuntimeError("payload list is empty")
results = []
for idx, p in enumerate(payload):
if not isinstance(p, dict):
raise RuntimeError(f"payload[{idx}] must be object")
results.append(await conn.send(p))
return {"ok": True, "result": results}
if not isinstance(payload, dict):
raise RuntimeError("payload must be object or array of objects")
else:
raise RuntimeError("action must be tts|command|player|volume|raw")
result = await conn.send(payload)
return {"ok": True, "result": result}
except Exception as e:
return {"ok": False, "error": str(e)}
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
# TCP-протокол простой: одна JSON-строка на запрос.
try:
line = await reader.readline()
if not line:
writer.close()
return
req = json.loads(line.decode("utf-8", errors="ignore"))
except Exception as e:
writer.write((json.dumps({"ok": False, "error": f"bad request: {e}"}, ensure_ascii=False) + "\n").encode("utf-8"))
await writer.drain()
writer.close()
return
resp = await execute_action(req)
writer.write((json.dumps(resp, ensure_ascii=False) + "\n").encode("utf-8"))
await writer.drain()
writer.close()
server = await asyncio.start_server(handle_client, host=args.host, port=args.port)
addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])
print(f"tcp server listening on {addrs}")
app = web.Application()
async def http_exec(request: web.Request) -> web.Response:
"""HTTP-обёртка над execute_action: query/form/json -> единый dict."""
data: Dict[str, Any] = {}
if request.method == "POST":
if request.content_type and "application/json" in request.content_type:
body = await request.json()
if isinstance(body, dict):
data.update(body)
else:
body = await request.post()
data.update({k: v for k, v in body.items()})
data.update({k: v for k, v in request.query.items()})
if "payload" in data and isinstance(data["payload"], str):
payload_raw = str(data["payload"]).strip()
if payload_raw:
try:
data["payload"] = json.loads(payload_raw)
except Exception:
return web.json_response({"ok": False, "error": "bad payload json"}, status=400)
result = await execute_action(data)
return web.json_response(result, status=200 if result.get("ok") else 422)
async def http_status(request: web.Request) -> web.Response:
"""Возвращает текущее состояние соединения и кэш плеера/громкости."""
station_sel = str(request.query.get("station") or "").strip()
conn = await resolve_conn(station_sel)
if conn is None:
return web.json_response({"ok": False, "error": "station not found"}, status=404)
# Keep persistent WS connection: try to restore it on each status poll.
try:
await conn.start()
except Exception:
pass
try:
await conn.probe_state()
except Exception:
pass
running = 1 if (conn._ws is not None and not conn._ws.closed) else 0
return web.json_response({
"ok": 1,
"running": running,
"playing": conn.playing,
"volume": conn.volume,
"station": conn.station.device_id,
})
async def http_health(_request: web.Request) -> web.Response:
return web.json_response({"ok": 1})
app.router.add_get("/health", http_health)
app.router.add_get("/api/status", http_status)
app.router.add_get("/api/exec", http_exec)
app.router.add_post("/api/exec", http_exec)
http_runner = web.AppRunner(app)
await http_runner.setup()
http_site = web.TCPSite(http_runner, host=args.http_host, port=args.http_port)
await http_site.start()
print(f"http server listening on {(args.http_host, args.http_port)}")
try:
async with server:
await server.serve_forever()
finally:
try:
await http_runner.cleanup()
except Exception:
pass
for c in conns.values():
await c.close()
async def client_call(args) -> int:
req = {"station": args.station, "action": args.action}
if args.action in {"tts", "command"}:
req["text"] = args.text
elif args.action == "player":
req["cmd"] = args.cmd
elif args.action == "volume":
req["level"] = float(args.level)
elif args.action == "raw":
req["payload"] = json.loads(args.payload)
reader, writer = await asyncio.open_connection(args.host, args.port)
writer.write((json.dumps(req, ensure_ascii=False) + "\n").encode("utf-8"))
await writer.drain()
line = await reader.readline()
writer.close()
if not line:
print(json.dumps({"ok": False, "error": "no response"}, ensure_ascii=False))
return 2
print(line.decode("utf-8", errors="ignore").rstrip("\n"))
return 0
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="yastation.py", description="Yandex station helper")
p.add_argument("--timeout", type=float, default=2.5)
p.add_argument("--token", type=str, default="")
p.add_argument("--wait", type=float, default=5.0)
sub = p.add_subparsers(dest="action", required=True)
sp = sub.add_parser("list")
sp.set_defaults(fn=do_list)
sp = sub.add_parser("stations-cloud")
sp.set_defaults(fn=do_stations_cloud)
sp = sub.add_parser("serve")
sp.add_argument("--host", default="127.0.0.1")
sp.add_argument("--port", type=int, default=9123)
sp.add_argument("--http-host", default="0.0.0.0")
sp.add_argument("--http-port", type=int, default=9124)
sp.add_argument("--stations", default="")
sp.set_defaults(fn=serve_daemon)
sp = sub.add_parser("client")
sp.add_argument("--host", default="127.0.0.1")
sp.add_argument("--port", type=int, default=9123)
sp.add_argument("--station", required=True)
sp.add_argument("action", help="tts|command|player|volume|raw")
sp.add_argument("--text", default="")
sp.add_argument("--cmd", default="")
sp.add_argument("--level", default="0")
sp.add_argument("--payload", default="{}")
sp.set_defaults(fn=client_call)
return p
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
return asyncio.run(args.fn(args))
except KeyboardInterrupt:
return 130
if __name__ == "__main__":
raise SystemExit(main())