cit-is-bot-backend/web/main.py

87 строки
1.7 KiB
Python

import asyncio
import os
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastapi.websockets import WebSocket, WebSocketDisconnect
from jinja2 import Environment, FileSystemLoader
import config
class ConnectionManager:
connections: list[WebSocket]
class StateError(Exception):
pass
def __init__(
self,
):
self.connections = []
async def connect(
self,
websocket: WebSocket,
):
await websocket.accept()
self.connections.append(websocket)
def disconnect(
self,
websocket: WebSocket,
):
self.connections.remove(websocket)
async def broadcast(
self,
data: dict,
):
for connection in self.connections:
asyncio.ensure_future(connection.send_json(data))
connection_manager = ConnectionManager()
app = FastAPI(
title=config.Main.app_name,
)
env = Environment(
loader=FileSystemLoader(
searchpath=os.path.join(
config.Main.cwd,
'templates',
),
),
enable_async=True,
)
env.globals['app_name'] = config.Main.app_name
@app.get(
path='/',
)
async def _():
template = env.get_template('main.jinja2')
return HTMLResponse(
content=await template.render_async(
title='Дашборд',
),
)
@app.websocket(
path='/ws',
)
async def _(
websocket: WebSocket,
):
await connection_manager.connect(websocket)
try:
while True:
await connection_manager.broadcast(await websocket.receive_json())
except WebSocketDisconnect:
connection_manager.disconnect(websocket)