195 строки
5.4 KiB
Python
195 строки
5.4 KiB
Python
import asyncio
|
|
from contextlib import asynccontextmanager
|
|
import hashlib
|
|
import os
|
|
|
|
import aiohttp
|
|
from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect
|
|
from fastapi.exceptions import HTTPException
|
|
from fastapi.responses import Response, HTMLResponse, FileResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from jinja2 import Environment, FileSystemLoader
|
|
|
|
import config
|
|
|
|
|
|
class TextStyle:
|
|
PURPLE = '\033[95m'
|
|
CYAN = '\033[96m'
|
|
DARKCYAN = '\033[36m'
|
|
BLUE = '\033[94m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
END = '\033[0m'
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
bold = (TextStyle.BOLD, TextStyle.END)
|
|
print('%sProgram Name:%s\tYT Music Live' % bold)
|
|
print('%sVersion:%s\t1.0.0' % bold)
|
|
print('%sAuthor:%s\t\tcsasq' % bold)
|
|
print('%sEmail:%s\t\tcsasq@csasq.ru' % bold)
|
|
print('%sWebsite:%s\thttps://csasq.ru/' % bold)
|
|
print('%sLicense:%s\tCreative Commons Attribution-NonCommercial 4.0 International' % bold)
|
|
print()
|
|
print("%sYou are free to:%s" % bold)
|
|
print("\t%sShare%s — copy and redistribute the material in any medium or format." % bold)
|
|
print("\t%sAdapt%s — remix, transform, and build upon the material." % bold)
|
|
print('\tThe licensor cannot revoke these freedoms as long as you follow the license terms.')
|
|
print("%sUnder the following terms:%s" % bold)
|
|
print("\t%sAttribution%s — You must give appropriate credit , provide a link to the license, and indicate if changes were made. You may do so in any reasonable manner, but not in any way that suggests the licensor endorses you or your use." % bold)
|
|
print('\t%sNonCommercial%s — You may not use the material for commercial purposes.' % bold)
|
|
print('\t%sNo additional restrictions%s — You may not apply legal terms or technological measures that legally restrict others from doing anything the license permits.' % bold)
|
|
print()
|
|
print('%sSee the legal code:%s' % bold)
|
|
print('https://creativecommons.org/licenses/by-nc/4.0/')
|
|
print()
|
|
print('%sSee the Git repository:%s' % bold)
|
|
print('https://git.csasq.ru/csasq/yt-music-live')
|
|
print()
|
|
print('Application startup complete (Press CTRL+C to quit)')
|
|
print('Widget is available at %shttp://%s:%d/overlay%s' % (
|
|
TextStyle.BOLD,
|
|
config.Main.host,
|
|
config.Main.port,
|
|
TextStyle.END,
|
|
))
|
|
yield
|
|
print('Application shutdown complete')
|
|
|
|
|
|
app = FastAPI(
|
|
openapi_url=None,
|
|
docs_url=None,
|
|
redoc_url=None,
|
|
lifespan=lifespan,
|
|
)
|
|
app.mount(
|
|
path='/static',
|
|
app=StaticFiles(
|
|
directory=os.path.join(
|
|
config.Main.working_directory,
|
|
'static',
|
|
),
|
|
),
|
|
)
|
|
|
|
env = Environment(
|
|
loader=FileSystemLoader(
|
|
searchpath=os.path.join(
|
|
'.',
|
|
'templates',
|
|
),
|
|
),
|
|
enable_async=True,
|
|
)
|
|
|
|
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
self.connections: list[WebSocket] = list[WebSocket]()
|
|
|
|
async def connect(self, websocket: WebSocket):
|
|
await websocket.accept()
|
|
self.connections.append(websocket)
|
|
|
|
def disconnect(self, websocket: WebSocket):
|
|
self.connections.remove(websocket)
|
|
|
|
async def broadcast(self, data: dict):
|
|
for connection in self.connections:
|
|
asyncio.ensure_future(connection.send_json(data))
|
|
|
|
|
|
overlay_manager = ConnectionManager()
|
|
server_manager = ConnectionManager()
|
|
|
|
|
|
@app.get(
|
|
path='/overlay',
|
|
)
|
|
async def _():
|
|
template = env.get_template('overlay.jinja2')
|
|
return HTMLResponse(
|
|
content=await template.render_async(),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@app.get(
|
|
path='/image',
|
|
)
|
|
async def _(
|
|
src: str = Query(
|
|
default=...,
|
|
),
|
|
):
|
|
image_name = hashlib.sha1(src.encode('ascii')).hexdigest()
|
|
image_path = os.path.join(
|
|
config.Main.images_directory,
|
|
image_name,
|
|
)
|
|
if not os.path.exists(image_path):
|
|
os.makedirs(
|
|
name=config.Main.images_directory,
|
|
exist_ok=True,
|
|
)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(src) as response:
|
|
if not response.ok:
|
|
raise HTTPException(
|
|
status_code=response.status,
|
|
)
|
|
with open(
|
|
file=image_path,
|
|
mode='wb',
|
|
) as file:
|
|
file.write(await response.read())
|
|
return FileResponse(
|
|
path=image_path,
|
|
)
|
|
|
|
|
|
@app.options(
|
|
path='/api/v1/overlay',
|
|
)
|
|
async def _():
|
|
return Response(
|
|
headers={
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Methods': '*',
|
|
'Access-Control-Allow-Headers': '*',
|
|
},
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@app.websocket('/ws/v1/overlay')
|
|
async def _(
|
|
websocket: WebSocket,
|
|
):
|
|
await overlay_manager.connect(websocket)
|
|
try:
|
|
while True:
|
|
data = await websocket.receive_json()
|
|
await server_manager.broadcast(data)
|
|
except WebSocketDisconnect:
|
|
overlay_manager.disconnect(websocket)
|
|
|
|
|
|
@app.websocket('/ws/v1/server')
|
|
async def _(
|
|
websocket: WebSocket,
|
|
):
|
|
await server_manager.connect(websocket)
|
|
try:
|
|
while True:
|
|
data = await websocket.receive_json()
|
|
await overlay_manager.broadcast(data)
|
|
except WebSocketDisconnect:
|
|
server_manager.disconnect(websocket)
|