import asyncio import os from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastapi.websockets import WebSocket, WebSocketDisconnect import config class ConnectionManager: connections: list[WebSocket] class StateError(Exception): pass def __init__( self, ): self.connections = [] async def connect( self, websocket: WebSocket, ): await websocket.accept() self.connections.append(websocket) def disconnect( self, websocket: WebSocket, ): self.connections.remove(websocket) async def broadcast( self, data: dict, ): for connection in self.connections: asyncio.ensure_future(connection.send_json(data)) connection_manager = ConnectionManager() api = FastAPI( title=config.Main.app_name, ) polls = [ { 'id': 1, 'name': 'Текущее состояние сотрудников', 'daysOfWeek': 'ПН-ПТ', 'time': '11:00', 'questionNumber': '1 вопрос', 'isEnabled': True, }, { 'id': 2, 'name': 'Планы на обед', 'daysOfWeek': 'ПН-ПТ', 'time': '11:45-12:00', 'questionNumber': '2 вопроса', 'isEnabled': False, }, ] @api.get( path='/api/polls', ) async def _(): return polls @api.put( path='/api/polls', ) async def _( poll: dict, ): for i, p in enumerate(polls): if p['id'] == poll['id']: polls[i] = poll # @app.websocket( # path='/ws/sync', # ) # async def _( # websocket: WebSocket, # ): # await connection_manager.connect(websocket) # try: # while True: # await connection_manager.broadcast(await websocket.receive_json()) # except WebSocketDisconnect: # connection_manager.disconnect(websocket)