cit-is-bot-backend/api/main.py

81 строка
1.7 KiB
Python
Исходник Обычный вид История

import asyncio
import os
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastapi.websockets import WebSocket, WebSocketDisconnect
import config
class ConnectionManager:
connections: list[WebSocket]
class StateError(Exception):
pass
def __init__(
self,
):
self.connections = []
async def connect(
self,
websocket: WebSocket,
):
await websocket.accept()
self.connections.append(websocket)
def disconnect(
self,
websocket: WebSocket,
):
self.connections.remove(websocket)
async def broadcast(
self,
data: any,
):
for connection in self.connections:
asyncio.ensure_future(connection.send_json(data))
connection_manager = ConnectionManager()
api = FastAPI(
title=config.Main.app_name,
)
scripts = [
{
'id': 1,
'name': 'Текущее состояние сотрудников',
'time': '0 11 * * 1-5',
'messageNumber': '1 вопрос',
'isEnabled': True,
},
{
'id': 2,
'name': 'Планы на обед',
'time': '45 11 * * 1-5',
'messageNumber': '2 вопроса',
'isEnabled': False,
},
]
@api.websocket(
path='/ws/scripts',
)
async def _(
websocket: WebSocket,
):
await connection_manager.connect(websocket)
try:
await connection_manager.broadcast(scripts)
while True:
await connection_manager.broadcast(await websocket.receive_json())
except WebSocketDisconnect:
connection_manager.disconnect(websocket)