From 9d9124e00a723d3a185c244795f44860af0e8306 Mon Sep 17 00:00:00 2001 From: Luis-Hebendanz Date: Mon, 20 Nov 2023 22:58:01 +0100 Subject: [PATCH] Working broadcasting library --- flake.lock | 6 +- .../clan_cli/{types.py => clan_types.py} | 0 pkgs/clan-cli/clan_cli/webui/app.py | 14 +- .../clan_cli/webui/routers/messenger.html | 202 ++++++++++++++++++ .../clan_cli/webui/routers/socket_manager2.py | 54 +++++ 5 files changed, 271 insertions(+), 5 deletions(-) rename pkgs/clan-cli/clan_cli/{types.py => clan_types.py} (100%) create mode 100644 pkgs/clan-cli/clan_cli/webui/routers/messenger.html create mode 100644 pkgs/clan-cli/clan_cli/webui/routers/socket_manager2.py diff --git a/flake.lock b/flake.lock index 314e233..5234a63 100644 --- a/flake.lock +++ b/flake.lock @@ -58,11 +58,11 @@ }, "nixpkgs-for-iosl": { "locked": { - "lastModified": 1700505490, - "narHash": "sha256-MLF5dkExensQoByZCmsR/kdcwZoaY/j6/ctSvmQHBJc=", + "lastModified": 1700515317, + "narHash": "sha256-DSnKT3glZCKE0/Rc6ainSJUbUGC248HNKVRSbo9kxkM=", "owner": "Luis-Hebendanz", "repo": "nixpkgs", - "rev": "5f9d94794badee6fcb3230ccea75628a802baeab", + "rev": "bcd6be7a5ab22c94c775945fb16a61b5867f15d2", "type": "github" }, "original": { diff --git a/pkgs/clan-cli/clan_cli/types.py b/pkgs/clan-cli/clan_cli/clan_types.py similarity index 100% rename from pkgs/clan-cli/clan_cli/types.py rename to pkgs/clan-cli/clan_cli/clan_types.py diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index a9f1d2d..cf7195f 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -4,11 +4,13 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles +from contextlib import asynccontextmanager +from typing import Any from ..errors import ClanError from .assets import asset_path from .error_handlers import clan_error_handler -from .routers import health, root, socket_manager +from .routers import health, root, socket_manager, socket_manager2 origins = [ "http://localhost:3000", @@ -17,8 +19,15 @@ origins = [ log = logging.getLogger(__name__) +@asynccontextmanager +async def lifespan(app: FastAPI) -> Any: + await socket_manager2.brd.connect() + yield + await socket_manager2.brd.disconnect() + + def setup_app() -> FastAPI: - app = FastAPI() + app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=origins, @@ -30,6 +39,7 @@ def setup_app() -> FastAPI: app.include_router(health.router) app.include_router(socket_manager.router) + app.include_router(socket_manager2.router) # Needs to be last in register. Because of wildcard route app.include_router(root.router) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/messenger.html b/pkgs/clan-cli/clan_cli/webui/routers/messenger.html new file mode 100644 index 0000000..a24b097 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/webui/routers/messenger.html @@ -0,0 +1,202 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/pkgs/clan-cli/clan_cli/webui/routers/socket_manager2.py b/pkgs/clan-cli/clan_cli/webui/routers/socket_manager2.py new file mode 100644 index 0000000..e8bf1ca --- /dev/null +++ b/pkgs/clan-cli/clan_cli/webui/routers/socket_manager2.py @@ -0,0 +1,54 @@ +# Requires: `starlette`, `uvicorn`, `jinja2` +# Run with `uvicorn example:app` +import anyio +from broadcaster import Broadcast +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse +from fastapi import APIRouter, Response +import os +import logging +import asyncio + +log = logging.getLogger(__name__) +router = APIRouter() + + +brd = Broadcast("memory://") + + +@router.get("/ws2_example") +async def get() -> HTMLResponse: + + html = open(f"{os.getcwd()}/webui/routers/messenger.html").read() + return HTMLResponse(html) + +@router.websocket("/ws2") +async def chatroom_ws(websocket: WebSocket) -> None: + await websocket.accept() + + + async with anyio.create_task_group() as task_group: + # run until first is complete + async def run_chatroom_ws_receiver() -> None: + await chatroom_ws_receiver(websocket=websocket) + task_group.cancel_scope.cancel() + + task_group.start_soon(run_chatroom_ws_receiver) + log.warning("Started chatroom_ws_sender") + + await chatroom_ws_sender(websocket) + + +async def chatroom_ws_receiver(websocket: WebSocket) -> None: + async for message in websocket.iter_text(): + log.warning(f"Received message: {message}") + await brd.publish(channel="chatroom", message=message) + + +async def chatroom_ws_sender(websocket: WebSocket) -> None: + async with brd.subscribe(channel="chatroom") as subscriber: + log.warning("====>Subscribed to chatroom channel") + async for event in subscriber: + log.warning(f"Sending message: {event.message}") + await websocket.send_text(event.message) +