import asyncio from typing import Callable from fastapi import FastAPI, Request, Response, Path, Body from fastapi.websockets import WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import config class ConnectionManager: connections: list[WebSocket] class StateError(Exception): pass def __init__( self, ): self.connections = [] async def connect( self, websocket: WebSocket, ): await websocket.accept() self.connections.append(websocket) def disconnect( self, websocket: WebSocket, ): self.connections.remove(websocket) async def broadcast( self, data: any, ): for connection in self.connections: asyncio.ensure_future(connection.send_json(data)) api = FastAPI( title=config.Main.app_name, ) scripts = [ { 'id': 0, 'name': 'Текущее состояние сотрудников', 'time': '0 11 * * 1-5', 'messageNumber': '1 вопрос', 'isEnabled': True, }, { 'id': 1, 'name': 'Планы на обед', 'time': '45 11 * * 1-5', 'messageNumber': '2 вопроса', 'isEnabled': False, }, ] scripts_cm = ConnectionManager() @api.middleware('http') async def function( request: Request, callback: Callable, ): response: Response = await callback(request) response.headers.append('Access-Control-Allow-Methods', '*') response.headers.append('Access-Control-Allow-Headers', '*') response.headers.append('Access-Control-Allow-Origin', '*') return response @api.options( path='/{path:path}', ) async def _( path: str, ): return Response() @api.get( path='/api/scripts', ) async def _(): return JSONResponse( content=[ script for script in scripts if script is not None ], status_code=200, ) @api.post( path='/api/scripts', ) async def _( name: str = Body(), time: str = Body(), message_number: str = Body( alias='messageNumber', validation_alias='messageNumber', serialization_alias='messageNumber', ), is_enabled: bool = Body( alias='isEnabled', validation_alias='isEnabled', serialization_alias='isEnabled', ), ): script = { 'id': len(scripts), 'name': name, 'time': time, 'messageNumber': message_number, 'isEnabled': is_enabled, }, scripts.append(script) return Response( status_code=201, ) @api.put( path='/api/scripts/{script_id}', ) async def _( script_id: int = Path(), name: str = Body(), time: str = Body(), message_number: str = Body( alias='messageNumber', validation_alias='messageNumber', serialization_alias='messageNumber', ), is_enabled: bool = Body( alias='isEnabled', validation_alias='isEnabled', serialization_alias='isEnabled', ), ): scripts[script_id]['name'] = name scripts[script_id]['time'] = time scripts[script_id]['messageNumber'] = message_number scripts[script_id]['isEnabled'] = is_enabled return Response( status_code=201, ) @api.delete( path='/api/scripts/{script_id}', ) async def _( script_id: int = Path(), ): scripts[script_id] = None return Response( status_code=204, ) @api.websocket( path='/ws/scripts', ) async def _( websocket: WebSocket, ): await scripts_cm.connect(websocket) try: await scripts_cm.broadcast(scripts) while True: await websocket.receive_json() except WebSocketDisconnect: scripts_cm.disconnect(websocket)