import asyncio import database from fastapi import FastAPI, Request, Body, WebSocket, WebSocketDisconnect from fastapi.responses import Response from fastapi.exceptions import HTTPException import models import typing app = FastAPI() @app.middleware('http') async def function( request: Request, callback: typing.Callable, ): async def too_many_requests( message: str, delay: int, ): raise HTTPException( status_code=429, detail={ 'msg': message, }, headers={ 'Retry-After': str(delay), }, ) requests_frequency = await database.get_requests_frequency( ip_address=request.client.host, ) if requests_frequency.per_day > 10000: return await too_many_requests( message='Слишком много запросов в день.', delay=86400, ) if requests_frequency.per_hour > 1000: return await too_many_requests( message='Слишком много запросов в час.', delay=3600, ) if requests_frequency.per_minute > 100: return await too_many_requests( message='Слишком много запросов в минуту.', delay=60, ) if requests_frequency.per_second > 1: return await too_many_requests( message='Слишком много запросов в секунду.', delay=1, ) asyncio.ensure_future( database.put_request( ip_address=request.client.host, method=request.method, url=str(request.url), is_secure=request.base_url.is_secure, user_agent=request.headers.get('User-Agent'), referer=request.headers.get('Referer'), ) ) return await callback(request) class ConnectionManager: def __init__(self): self.connections: list[WebSocket] = list[WebSocket]() async def connect(self, websocket: WebSocket): await websocket.accept() self.connections.append(websocket) def disconnect(self, websocket: WebSocket): self.connections.remove(websocket) async def broadcast(self, data: dict): for connection in self.connections: asyncio.ensure_future(connection.send_json(data)) manager = ConnectionManager() @app.websocket('/ws/v1/pixels') async def function( websocket: WebSocket, ): await manager.connect(websocket) try: await websocket.receive() except WebSocketDisconnect: manager.disconnect(websocket) @app.get('/api/v1/pixels') async def function(): pixels_co = database.get_pixels() return Response( content=models.Area( pixels=await pixels_co, ).json(), status_code=200, headers=models.headers, media_type='application/json', ) @app.put('/api/v1/pixels') async def function( request: Request, x: int = Body( ..., ge=0, le=models.area_width - 1, ), y: int = Body( ..., ge=0, le=models.area_height - 1, ), color: int = Body( ..., ge=0x000000, le=0xFFFFFF, ), ): await database.put_pixel( x=x, y=y, color=color, ip_address=request.client.host, ) await manager.broadcast({ 'x': x, 'y': y, 'color': color, }) return Response( status_code=201, headers=models.headers, ) @app.options('/api/v1/pixels') async def function(): return Response( headers=models.headers, )