From 904301c20e26dbf12340a5bad4a624b9229f9b6c Mon Sep 17 00:00:00 2001 From: lassulus Date: Fri, 22 Sep 2023 15:37:30 +0200 Subject: [PATCH 01/21] api/vm/create: start vm --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 18c8c74..99b124c 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -52,6 +52,16 @@ def nix_build_vm(machine: str, flake_url: str) -> list[str]: ) +async def start_vm(vm_path: str) -> None: + proc = await asyncio.create_subprocess_exec( + vm_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + await proc.wait() + + @router.post("/api/vms/inspect") async def inspect_vm( flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()] @@ -90,11 +100,15 @@ async def vm_build(vm: VmConfig) -> AsyncIterator[str]: stderr=asyncio.subprocess.PIPE, ) assert proc.stdout is not None and proc.stderr is not None + vm_path = "" async for line in proc.stdout: - yield line.decode("utf-8", "ignore") + vm_path = f'{line.decode("utf-8", "ignore").strip()}/bin/run-nixos-vm' + + await start_vm(vm_path) stderr = "" async for line in proc.stderr: stderr += line.decode("utf-8", "ignore") + yield line.decode("utf-8", "ignore") res = await proc.wait() if res != 0: raise NixBuildException( From 9dca1a4672d4d8b549d52480212074d7efb47fa6 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Fri, 22 Sep 2023 18:34:43 +0200 Subject: [PATCH 02/21] CLI: Added custom logger --- pkgs/clan-cli/clan_cli/custom_logger.py | 42 +++++++++++++++++++ pkgs/clan-cli/clan_cli/webui/app.py | 4 ++ .../clan_cli/webui/routers/machines.py | 4 ++ pkgs/clan-cli/clan_cli/webui/routers/vms.py | 7 +++- pkgs/clan-cli/clan_cli/webui/server.py | 4 +- 5 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 pkgs/clan-cli/clan_cli/custom_logger.py diff --git a/pkgs/clan-cli/clan_cli/custom_logger.py b/pkgs/clan-cli/clan_cli/custom_logger.py new file mode 100644 index 0000000..242e802 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/custom_logger.py @@ -0,0 +1,42 @@ +import logging +import datetime + +class CustomFormatter(logging.Formatter): + + grey = "\x1b[38;20m" + yellow = "\x1b[33;20m" + red = "\x1b[31;20m" + bold_red = "\x1b[31;1m" + green = "\u001b[32m" + blue = "\u001b[34m" + + def format_str(color): + reset = "\x1b[0m" + return f"%(asctime)s - %(name)s - {color} %(levelname)s {reset} - %(message)s (%(filename)s:%(lineno)d)" + + FORMATS = { + logging.DEBUG: format_str(blue), + logging.INFO: format_str(green), + logging.WARNING: format_str(yellow), + logging.ERROR: format_str(red), + logging.CRITICAL: format_str(bold_red) + } + + def formatTime(self, record,datefmt=None): + now = datetime.datetime.now() + now = now.strftime("%H:%M:%S") + return now + + def format(self, record): + log_fmt = self.FORMATS.get(record.levelno) + formatter = logging.Formatter(log_fmt) + formatter.formatTime = self.formatTime + return formatter.format(record) + + +def register(level): + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter(CustomFormatter()) + logging.basicConfig(level=level, handlers=[ch]) + diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index 2eb53aa..8b7f645 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -2,7 +2,9 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles +import logging +from .. import custom_logger from .assets import asset_path from .routers import flake, health, machines, root, vms @@ -35,4 +37,6 @@ def setup_app() -> FastAPI: return app +custom_logger.register(logging.getLogger('uvicorn').level) app = setup_app() + diff --git a/pkgs/clan-cli/clan_cli/webui/routers/machines.py b/pkgs/clan-cli/clan_cli/webui/routers/machines.py index 61fd5ed..a2b7778 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/machines.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/machines.py @@ -19,6 +19,10 @@ from ..schemas import ( Status, ) +# Logging setup +import logging +log = logging.getLogger(__name__) + router = APIRouter() diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 99b124c..8a55350 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -3,15 +3,16 @@ import json import shlex from typing import Annotated, AsyncIterator -from fastapi import APIRouter, Body, HTTPException, Request, status + +from fastapi import APIRouter, Body, HTTPException, Request, status, logger from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse, StreamingResponse from ...nix import nix_build, nix_eval from ..schemas import VmConfig, VmInspectResponse -router = APIRouter() +router = APIRouter() class NixBuildException(HTTPException): def __init__(self, msg: str, loc: list = ["body", "flake_attr"]): @@ -120,8 +121,10 @@ command output: {stderr} """ ) +import logging @router.post("/api/vms/create") async def create_vm(vm: Annotated[VmConfig, Body()]) -> StreamingResponse: return StreamingResponse(vm_build(vm)) + diff --git a/pkgs/clan-cli/clan_cli/webui/server.py b/pkgs/clan-cli/clan_cli/webui/server.py index 1b7164f..1c052a1 100644 --- a/pkgs/clan-cli/clan_cli/webui/server.py +++ b/pkgs/clan-cli/clan_cli/webui/server.py @@ -7,12 +7,11 @@ import webbrowser from contextlib import ExitStack, contextmanager from pathlib import Path from threading import Thread -from typing import Iterator +from typing import (Iterator, Dict, Any) # XXX: can we dynamically load this using nix develop? from uvicorn import run -logger = logging.getLogger(__name__) def defer_open_browser(base_url: str) -> None: @@ -87,5 +86,6 @@ def start_server(args: argparse.Namespace) -> None: port=args.port, log_level=args.log_level, reload=args.reload, + access_log=args.log_level == "debug", headers=headers, ) From b86e02e183648efd8bb2359159208d13d10aa5c1 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Fri, 22 Sep 2023 18:44:05 +0200 Subject: [PATCH 03/21] vscode debugging integration added + README updated --- pkgs/clan-cli/.vscode/launch.json | 16 ++++++++++++++++ pkgs/clan-cli/README.md | 24 ++++++++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 pkgs/clan-cli/.vscode/launch.json diff --git a/pkgs/clan-cli/.vscode/launch.json b/pkgs/clan-cli/.vscode/launch.json new file mode 100644 index 0000000..8dc9244 --- /dev/null +++ b/pkgs/clan-cli/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Clan Webui", + "type": "python", + "request": "launch", + "module": "clan_cli.webui", + "justMyCode": true, + "args": [ "--reload", "--no-open", "--log-level", "debug" ] + } + ] +} \ No newline at end of file diff --git a/pkgs/clan-cli/README.md b/pkgs/clan-cli/README.md index df9c509..ca831a6 100644 --- a/pkgs/clan-cli/README.md +++ b/pkgs/clan-cli/README.md @@ -28,6 +28,30 @@ To start a local developement environment instead, use the `--dev` flag: This will spawn two webserver, a python one to for the api and a nodejs one that rebuilds the ui on the fly. +## Run webui directly +Useful for vscode run and debug option +```bash +python -m clan_cli.webui --reload --no-open +``` + +Add this `launch.json` to your .vscode directory to have working breakpoints in your vscode editor. +```json +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Clan Webui", + "type": "python", + "request": "launch", + "module": "clan_cli.webui", + "justMyCode": true, + "args": [ "--reload", "--no-open", "--log-level", "debug" ] + } + ] +} +``` + + ## Run locally single-threaded for debugging By default tests run in parallel using pytest-parallel. From 7479fca82b47beff2ffd9eabbc0df0d25e1de0dc Mon Sep 17 00:00:00 2001 From: Qubasa Date: Fri, 22 Sep 2023 19:58:45 +0200 Subject: [PATCH 04/21] Started working on vm_create --- pkgs/clan-cli/clan_cli/custom_logger.py | 4 ++-- pkgs/clan-cli/clan_cli/webui/app.py | 8 ++++++-- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 5 ++++- pkgs/clan-cli/tests/test_vms_api.py | 1 + 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/custom_logger.py b/pkgs/clan-cli/clan_cli/custom_logger.py index 242e802..52d3f1a 100644 --- a/pkgs/clan-cli/clan_cli/custom_logger.py +++ b/pkgs/clan-cli/clan_cli/custom_logger.py @@ -12,7 +12,7 @@ class CustomFormatter(logging.Formatter): def format_str(color): reset = "\x1b[0m" - return f"%(asctime)s - %(name)s - {color} %(levelname)s {reset} - %(message)s (%(filename)s:%(lineno)d)" + return f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s" FORMATS = { logging.DEBUG: format_str(blue), @@ -36,7 +36,7 @@ class CustomFormatter(logging.Formatter): def register(level): ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) + ch.setLevel(level) ch.setFormatter(CustomFormatter()) logging.basicConfig(level=level, handlers=[ch]) diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index 8b7f645..f141806 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -11,6 +11,8 @@ from .routers import flake, health, machines, root, vms origins = [ "http://localhost:3000", ] +# Logging setup +log = logging.getLogger(__name__) def setup_app() -> FastAPI: @@ -37,6 +39,8 @@ def setup_app() -> FastAPI: return app -custom_logger.register(logging.getLogger('uvicorn').level) +#TODO: How do I get the log level from the command line in here? +custom_logger.register(logging.DEBUG) app = setup_app() - +log.warn("log warn") +log.debug("log debug") diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 8a55350..95688b2 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -2,7 +2,7 @@ import asyncio import json import shlex from typing import Annotated, AsyncIterator - +import logging from fastapi import APIRouter, Body, HTTPException, Request, status, logger from fastapi.encoders import jsonable_encoder @@ -11,6 +11,8 @@ from fastapi.responses import JSONResponse, StreamingResponse from ...nix import nix_build, nix_eval from ..schemas import VmConfig, VmInspectResponse +# Logging setup +log = logging.getLogger(__name__) router = APIRouter() @@ -94,6 +96,7 @@ command output: async def vm_build(vm: VmConfig) -> AsyncIterator[str]: cmd = nix_build_vm(vm.flake_attr, flake_url=vm.flake_url) + log.debug(f"Running command: {shlex.join(cmd)}") proc = await asyncio.create_subprocess_exec( cmd[0], *cmd[1:], diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 8935e6c..7d202a1 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -20,6 +20,7 @@ def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: @pytest.mark.impure def test_create(api: TestClient, test_flake_with_core: Path) -> None: + print(f"flake_url: {test_flake_with_core} ") response = api.post( "/api/vms/create", json=dict( From d16bb5db266e3572b681e45b070bd4ee0690d82a Mon Sep 17 00:00:00 2001 From: Qubasa Date: Mon, 25 Sep 2023 16:28:32 +0200 Subject: [PATCH 05/21] Added threaded create_vm endpoint --- pkgs/clan-cli/.vscode/launch.json | 4 +- pkgs/clan-cli/README.md | 26 +-- pkgs/clan-cli/clan_cli/custom_logger.py | 3 +- pkgs/clan-cli/clan_cli/webui/app.py | 4 + .../clan_cli/webui/routers/machines.py | 7 +- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 159 ++++++++++-------- pkgs/clan-cli/clan_cli/webui/schemas.py | 2 + pkgs/clan-cli/clan_cli/webui/server.py | 4 +- 8 files changed, 120 insertions(+), 89 deletions(-) diff --git a/pkgs/clan-cli/.vscode/launch.json b/pkgs/clan-cli/.vscode/launch.json index 8dc9244..a15810a 100644 --- a/pkgs/clan-cli/.vscode/launch.json +++ b/pkgs/clan-cli/.vscode/launch.json @@ -9,8 +9,8 @@ "type": "python", "request": "launch", "module": "clan_cli.webui", - "justMyCode": true, - "args": [ "--reload", "--no-open", "--log-level", "debug" ] + "justMyCode": false, + "args": [ "--reload", "--no-open", "--log-level", "debug" ], } ] } \ No newline at end of file diff --git a/pkgs/clan-cli/README.md b/pkgs/clan-cli/README.md index ca831a6..3358327 100644 --- a/pkgs/clan-cli/README.md +++ b/pkgs/clan-cli/README.md @@ -29,29 +29,31 @@ To start a local developement environment instead, use the `--dev` flag: This will spawn two webserver, a python one to for the api and a nodejs one that rebuilds the ui on the fly. ## Run webui directly + Useful for vscode run and debug option + ```bash python -m clan_cli.webui --reload --no-open ``` Add this `launch.json` to your .vscode directory to have working breakpoints in your vscode editor. + ```json { - "version": "0.2.0", - "configurations": [ - { - "name": "Clan Webui", - "type": "python", - "request": "launch", - "module": "clan_cli.webui", - "justMyCode": true, - "args": [ "--reload", "--no-open", "--log-level", "debug" ] - } - ] + "version": "0.2.0", + "configurations": [ + { + "name": "Clan Webui", + "type": "python", + "request": "launch", + "module": "clan_cli.webui", + "justMyCode": true, + "args": ["--reload", "--no-open", "--log-level", "debug"] + } + ] } ``` - ## Run locally single-threaded for debugging By default tests run in parallel using pytest-parallel. diff --git a/pkgs/clan-cli/clan_cli/custom_logger.py b/pkgs/clan-cli/clan_cli/custom_logger.py index 52d3f1a..8566b95 100644 --- a/pkgs/clan-cli/clan_cli/custom_logger.py +++ b/pkgs/clan-cli/clan_cli/custom_logger.py @@ -1,5 +1,6 @@ -import logging import datetime +import logging + class CustomFormatter(logging.Formatter): diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index f141806..0c03bcc 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -42,5 +42,9 @@ def setup_app() -> FastAPI: #TODO: How do I get the log level from the command line in here? custom_logger.register(logging.DEBUG) app = setup_app() + +for i in app.exception_handlers.items(): + log.info(f"Registered exception handler: {i}") + log.warn("log warn") log.debug("log debug") diff --git a/pkgs/clan-cli/clan_cli/webui/routers/machines.py b/pkgs/clan-cli/clan_cli/webui/routers/machines.py index a2b7778..1a19530 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/machines.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/machines.py @@ -1,3 +1,5 @@ +# Logging setup +import logging from typing import Annotated from fastapi import APIRouter, Body @@ -19,10 +21,7 @@ from ..schemas import ( Status, ) -# Logging setup -import logging log = logging.getLogger(__name__) - router = APIRouter() @@ -42,7 +41,7 @@ async def create_machine(machine: Annotated[MachineCreate, Body()]) -> MachineRe @router.get("/api/machines/{name}") async def get_machine(name: str) -> MachineResponse: - print("TODO") + log.error("TODO") return MachineResponse(machine=Machine(name=name, status=Status.UNKNOWN)) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 95688b2..9ed4579 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -1,45 +1,27 @@ import asyncio import json -import shlex -from typing import Annotated, AsyncIterator import logging +import os +import shlex +import uuid +from typing import Annotated, AsyncIterator -from fastapi import APIRouter, Body, HTTPException, Request, status, logger +from fastapi import APIRouter, Body, FastAPI, HTTPException, Request, status, BackgroundTasks from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse, StreamingResponse from ...nix import nix_build, nix_eval -from ..schemas import VmConfig, VmInspectResponse +from ..schemas import VmConfig, VmInspectResponse, VmCreateResponse # Logging setup log = logging.getLogger(__name__) router = APIRouter() - -class NixBuildException(HTTPException): - def __init__(self, msg: str, loc: list = ["body", "flake_attr"]): - detail = [ - { - "loc": loc, - "msg": msg, - "type": "value_error", - } - ] - super().__init__( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=detail - ) +app = FastAPI() -def nix_build_exception_handler( - request: Request, exc: NixBuildException -) -> JSONResponse: - return JSONResponse( - status_code=exc.status_code, - content=jsonable_encoder(dict(detail=exc.detail)), - ) - -def nix_inspect_vm(machine: str, flake_url: str) -> list[str]: +def nix_inspect_vm_cmd(machine: str, flake_url: str) -> list[str]: return nix_eval( [ f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config" @@ -47,7 +29,7 @@ def nix_inspect_vm(machine: str, flake_url: str) -> list[str]: ) -def nix_build_vm(machine: str, flake_url: str) -> list[str]: +def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]: return nix_build( [ f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm" @@ -55,21 +37,13 @@ def nix_build_vm(machine: str, flake_url: str) -> list[str]: ) -async def start_vm(vm_path: str) -> None: - proc = await asyncio.create_subprocess_exec( - vm_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - await proc.wait() @router.post("/api/vms/inspect") async def inspect_vm( flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()] ) -> VmInspectResponse: - cmd = nix_inspect_vm(flake_attr, flake_url=flake_url) + cmd = nix_inspect_vm_cmd(flake_attr, flake_url=flake_url) proc = await asyncio.create_subprocess_exec( cmd[0], *cmd[1:], @@ -94,40 +68,91 @@ command output: ) -async def vm_build(vm: VmConfig) -> AsyncIterator[str]: - cmd = nix_build_vm(vm.flake_attr, flake_url=vm.flake_url) - log.debug(f"Running command: {shlex.join(cmd)}") - proc = await asyncio.create_subprocess_exec( - cmd[0], - *cmd[1:], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - assert proc.stdout is not None and proc.stderr is not None - vm_path = "" - async for line in proc.stdout: - vm_path = f'{line.decode("utf-8", "ignore").strip()}/bin/run-nixos-vm' - await start_vm(vm_path) - stderr = "" - async for line in proc.stderr: - stderr += line.decode("utf-8", "ignore") - yield line.decode("utf-8", "ignore") - res = await proc.wait() - if res != 0: - raise NixBuildException( - f""" -Failed to build vm from '{vm.flake_url}#{vm.flake_attr}'. -command: {shlex.join(cmd)} -exit code: {res} -command output: -{stderr} - """ +class NixBuildException(HTTPException): + def __init__(self, uuid: uuid.UUID, msg: str,loc: list = ["body", "flake_attr"]): + self.uuid = uuid + detail = [ + { + "loc": loc, + "uuid": str(uuid), + "msg": msg, + "type": "value_error", + } + ] + super().__init__( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=detail ) -import logging + + + +import threading +import subprocess +import uuid + + +class BuildVM(threading.Thread): + def __init__(self, vm: VmConfig, uuid: uuid.UUID): + # calling parent class constructor + threading.Thread.__init__(self) + + # constructor + self.vm: VmConfig = vm + self.uuid: uuid.UUID = uuid + self.log = logging.getLogger(__name__) + self.process: subprocess.Popen = None + + def run(self): + self.log.debug(f"BuildVM with uuid {self.uuid} started") + + cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) + (out, err) = self.run_cmd(cmd) + vm_path = f'{out.strip()}/bin/run-nixos-vm' + + self.log.debug(f"vm_path: {vm_path}") + + (out, err) = self.run_cmd(vm_path) + + + def run_cmd(self, cmd: list[str]): + cwd=os.getcwd() + log.debug(f"Working directory: {cwd}") + log.debug(f"Running command: {shlex.join(cmd)}") + self.process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + cwd=cwd, + ) + + self.process.wait() + if self.process.returncode != 0: + raise NixBuildException(self.uuid, f"Failed to run command: {shlex.join(cmd)}") + + log.info("Successfully ran command") + return (self.process.stdout, self.process.stderr) + +POOL: dict[uuid.UUID, BuildVM] = {} + + +def nix_build_exception_handler( + request: Request, exc: NixBuildException +) -> JSONResponse: + log.error("NixBuildException: %s", exc) + del POOL[exc.uuid] + return JSONResponse( + status_code=exc.status_code, + content=jsonable_encoder(dict(detail=exc.detail)), + ) @router.post("/api/vms/create") -async def create_vm(vm: Annotated[VmConfig, Body()]) -> StreamingResponse: - return StreamingResponse(vm_build(vm)) +async def create_vm(vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks) -> StreamingResponse: + handle_id = uuid.uuid4() + handle = BuildVM(vm, handle_id) + handle.start() + POOL[handle_id] = handle + return VmCreateResponse(uuid=str(handle_id)) + diff --git a/pkgs/clan-cli/clan_cli/webui/schemas.py b/pkgs/clan-cli/clan_cli/webui/schemas.py index 11a6037..8ee819c 100644 --- a/pkgs/clan-cli/clan_cli/webui/schemas.py +++ b/pkgs/clan-cli/clan_cli/webui/schemas.py @@ -43,6 +43,8 @@ class VmConfig(BaseModel): memory_size: int graphics: bool +class VmCreateResponse(BaseModel): + uuid: str class VmInspectResponse(BaseModel): config: VmConfig diff --git a/pkgs/clan-cli/clan_cli/webui/server.py b/pkgs/clan-cli/clan_cli/webui/server.py index 1c052a1..800cdab 100644 --- a/pkgs/clan-cli/clan_cli/webui/server.py +++ b/pkgs/clan-cli/clan_cli/webui/server.py @@ -1,5 +1,4 @@ import argparse -import logging import subprocess import time import urllib.request @@ -7,13 +6,12 @@ import webbrowser from contextlib import ExitStack, contextmanager from pathlib import Path from threading import Thread -from typing import (Iterator, Dict, Any) +from typing import Iterator # XXX: can we dynamically load this using nix develop? from uvicorn import run - def defer_open_browser(base_url: str) -> None: for i in range(5): try: From f6c8b963c195f76100da3800ca375b6295bd7a8c Mon Sep 17 00:00:00 2001 From: Qubasa Date: Mon, 25 Sep 2023 20:09:27 +0200 Subject: [PATCH 06/21] Improving endpoint --- pkgs/clan-cli/clan_cli/custom_logger.py | 16 +- pkgs/clan-cli/clan_cli/webui/app.py | 7 +- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 171 +++++++++++++++----- pkgs/clan-cli/clan_cli/webui/schemas.py | 7 + pkgs/clan-cli/clan_cli/webui/server.py | 5 +- 5 files changed, 156 insertions(+), 50 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/custom_logger.py b/pkgs/clan-cli/clan_cli/custom_logger.py index 8566b95..b16fc89 100644 --- a/pkgs/clan-cli/clan_cli/custom_logger.py +++ b/pkgs/clan-cli/clan_cli/custom_logger.py @@ -1,9 +1,9 @@ import datetime import logging +from typing import Any class CustomFormatter(logging.Formatter): - grey = "\x1b[38;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" @@ -11,7 +11,8 @@ class CustomFormatter(logging.Formatter): green = "\u001b[32m" blue = "\u001b[34m" - def format_str(color): + @staticmethod + def format_str(color: str) -> str: reset = "\x1b[0m" return f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s" @@ -20,24 +21,23 @@ class CustomFormatter(logging.Formatter): logging.INFO: format_str(green), logging.WARNING: format_str(yellow), logging.ERROR: format_str(red), - logging.CRITICAL: format_str(bold_red) + logging.CRITICAL: format_str(bold_red), } - def formatTime(self, record,datefmt=None): + def format_time(self, record: Any, datefmt: Any = None) -> str: now = datetime.datetime.now() now = now.strftime("%H:%M:%S") return now - def format(self, record): + def format(self, record: Any) -> str: log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) - formatter.formatTime = self.formatTime + formatter.formatTime = self.format_time return formatter.format(record) -def register(level): +def register(level: Any) -> None: ch = logging.StreamHandler() ch.setLevel(level) ch.setFormatter(CustomFormatter()) logging.basicConfig(level=level, handlers=[ch]) - diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index 0c03bcc..bd58678 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -1,10 +1,11 @@ +import logging + from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles -import logging -from .. import custom_logger +from .. import custom_logger from .assets import asset_path from .routers import flake, health, machines, root, vms @@ -39,7 +40,7 @@ def setup_app() -> FastAPI: return app -#TODO: How do I get the log level from the command line in here? +# TODO: How do I get the log level from the command line in here? custom_logger.register(logging.DEBUG) app = setup_app() diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 9ed4579..47566fa 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -2,16 +2,27 @@ import asyncio import json import logging import os +import select +import queue import shlex -import uuid +import subprocess +import threading from typing import Annotated, AsyncIterator +from uuid import UUID, uuid4 -from fastapi import APIRouter, Body, FastAPI, HTTPException, Request, status, BackgroundTasks +from fastapi import ( + APIRouter, + BackgroundTasks, + Body, + HTTPException, + Request, + status, +) from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse, StreamingResponse from ...nix import nix_build, nix_eval -from ..schemas import VmConfig, VmInspectResponse, VmCreateResponse +from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse # Logging setup log = logging.getLogger(__name__) @@ -37,8 +48,6 @@ def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]: ) - - @router.post("/api/vms/inspect") async def inspect_vm( flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()] @@ -68,9 +77,8 @@ command output: ) - class NixBuildException(HTTPException): - def __init__(self, uuid: uuid.UUID, msg: str,loc: list = ["body", "flake_attr"]): + def __init__(self, uuid: UUID, msg: str, loc: list = ["body", "flake_attr"]): self.uuid = uuid detail = [ { @@ -85,74 +93,161 @@ class NixBuildException(HTTPException): ) - -import threading -import subprocess -import uuid - +class ProcessState: + def __init__(self, proc: subprocess.Popen): + self.proc: subprocess.Process = proc + self.stdout: list[str] = [] + self.stderr: list[str] = [] + self.returncode: int | None = None + self.done: bool = False class BuildVM(threading.Thread): - def __init__(self, vm: VmConfig, uuid: uuid.UUID): + def __init__(self, vm: VmConfig, uuid: UUID): # calling parent class constructor threading.Thread.__init__(self) # constructor self.vm: VmConfig = vm - self.uuid: uuid.UUID = uuid + self.uuid: UUID = uuid self.log = logging.getLogger(__name__) - self.process: subprocess.Popen = None + self.procs: list[ProcessState] = [] + self.failed: bool = False + self.finished: bool = False def run(self): - self.log.debug(f"BuildVM with uuid {self.uuid} started") + try: - cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) - (out, err) = self.run_cmd(cmd) - vm_path = f'{out.strip()}/bin/run-nixos-vm' + self.log.debug(f"BuildVM with uuid {self.uuid} started") + cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) - self.log.debug(f"vm_path: {vm_path}") + proc = self.run_cmd(cmd) + out = proc.stdout + self.log.debug(f"out: {out}") - (out, err) = self.run_cmd(vm_path) + vm_path = f"{''.join(out)}/bin/run-nixos-vm" + self.log.debug(f"vm_path: {vm_path}") + self.run_cmd(vm_path) + self.finished = True + except Exception as e: + self.failed = True + self.finished = True + log.exception(e) - def run_cmd(self, cmd: list[str]): - cwd=os.getcwd() + def run_cmd(self, cmd: list[str]) -> ProcessState: + cwd = os.getcwd() log.debug(f"Working directory: {cwd}") log.debug(f"Running command: {shlex.join(cmd)}") - self.process = subprocess.Popen( + process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", cwd=cwd, ) + state = ProcessState(process) + self.procs.append(state) - self.process.wait() - if self.process.returncode != 0: - raise NixBuildException(self.uuid, f"Failed to run command: {shlex.join(cmd)}") + while process.poll() is None: + # Check if stderr is ready to be read from + rlist, _, _ = select.select([process.stderr, process.stdout], [], [], 0) + if process.stderr in rlist: + line = process.stderr.readline() + state.stderr.append(line) + if process.stdout in rlist: + line = process.stdout.readline() + state.stdout.append(line) - log.info("Successfully ran command") - return (self.process.stdout, self.process.stderr) + state.returncode = process.returncode + state.done = True -POOL: dict[uuid.UUID, BuildVM] = {} + if process.returncode != 0: + raise NixBuildException( + self.uuid, f"Failed to run command: {shlex.join(cmd)}" + ) + + log.debug("Successfully ran command") + return state + + +class VmTaskPool: + def __init__(self) -> None: + self.lock: threading.RLock = threading.RLock() + self.pool: dict[UUID, BuildVM] = {} + + def __getitem__(self, uuid: str | UUID) -> BuildVM: + with self.lock: + if type(uuid) is UUID: + return self.pool[uuid] + else: + uuid = UUID(uuid) + return self.pool[uuid] + + def __setitem__(self, uuid: UUID, vm: BuildVM) -> None: + with self.lock: + if uuid in self.pool: + raise KeyError(f"VM with uuid {uuid} already exists") + if type(uuid) is not UUID: + raise TypeError("uuid must be of type UUID") + self.pool[uuid] = vm + + +POOL: VmTaskPool = VmTaskPool() def nix_build_exception_handler( request: Request, exc: NixBuildException ) -> JSONResponse: log.error("NixBuildException: %s", exc) - del POOL[exc.uuid] + # del POOL[exc.uuid] return JSONResponse( status_code=exc.status_code, content=jsonable_encoder(dict(detail=exc.detail)), ) +@router.get("/api/vms/{uuid}/status") +async def get_status(uuid: str) -> VmStatusResponse: + global POOL + handle = POOL[uuid] + + if handle.process.poll() is None: + return VmStatusResponse(running=True, status=0) + else: + return VmStatusResponse(running=False, status=handle.process.returncode) + + + +@router.get("/api/vms/{uuid}/logs") +async def get_logs(uuid: str) -> StreamingResponse: + async def stream_logs() -> AsyncIterator[str]: + global POOL + handle = POOL[uuid] + for proc in handle.procs.values(): + while True: + if proc.stdout.empty() and proc.stderr.empty() and not proc.done: + await asyncio.sleep(0.1) + continue + if proc.stdout.empty() and proc.stderr.empty() and proc.done: + break + for line in proc.stdout: + yield line + for line in proc.stderr: + yield line + + return StreamingResponse( + content=stream_logs(), + media_type="text/plain", + ) + + @router.post("/api/vms/create") -async def create_vm(vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks) -> StreamingResponse: - handle_id = uuid.uuid4() - handle = BuildVM(vm, handle_id) +async def create_vm( + vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks +) -> VmCreateResponse: + global POOL + uuid = uuid4() + handle = BuildVM(vm, uuid) handle.start() - POOL[handle_id] = handle - return VmCreateResponse(uuid=str(handle_id)) - - + POOL[uuid] = handle + return VmCreateResponse(uuid=str(uuid)) diff --git a/pkgs/clan-cli/clan_cli/webui/schemas.py b/pkgs/clan-cli/clan_cli/webui/schemas.py index 8ee819c..874e18a 100644 --- a/pkgs/clan-cli/clan_cli/webui/schemas.py +++ b/pkgs/clan-cli/clan_cli/webui/schemas.py @@ -43,9 +43,16 @@ class VmConfig(BaseModel): memory_size: int graphics: bool + +class VmStatusResponse(BaseModel): + status: int + running: bool + + class VmCreateResponse(BaseModel): uuid: str + class VmInspectResponse(BaseModel): config: VmConfig diff --git a/pkgs/clan-cli/clan_cli/webui/server.py b/pkgs/clan-cli/clan_cli/webui/server.py index 800cdab..8d67d5a 100644 --- a/pkgs/clan-cli/clan_cli/webui/server.py +++ b/pkgs/clan-cli/clan_cli/webui/server.py @@ -1,4 +1,5 @@ import argparse +import logging import subprocess import time import urllib.request @@ -11,6 +12,8 @@ from typing import Iterator # XXX: can we dynamically load this using nix develop? from uvicorn import run +log = logging.getLogger(__name__) + def defer_open_browser(base_url: str) -> None: for i in range(5): @@ -24,7 +27,7 @@ def defer_open_browser(base_url: str) -> None: @contextmanager def spawn_node_dev_server(host: str, port: int) -> Iterator[None]: - logger.info("Starting node dev server...") + log.info("Starting node dev server...") path = Path(__file__).parent.parent.parent.parent / "ui" with subprocess.Popen( [ From 04f3547be01fc1b87d7e6a836048ecc30baa1df9 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 26 Sep 2023 12:14:26 +0200 Subject: [PATCH 07/21] Added test --- pkgs/clan-cli/tests/test_vms_api.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 7d202a1..fdc1a09 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -32,3 +32,13 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: ), ) assert response.status_code == 200, "Failed to inspect vm" + + uuid = response.json()["uuid"] + assert len(uuid) == 36 + assert uuid.count("-") == 4 + + response = api.get(f"/api/vms/{uuid}/status") + for line in response.stream: + print(line) + + assert response.status_code == 200, "Failed to get vm status" \ No newline at end of file From c2fb42e95375c6780befa37f281f1ac1ad8342ff Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 26 Sep 2023 19:36:01 +0200 Subject: [PATCH 08/21] Extracted threadpool to task_manager.py --- pkgs/clan-cli/clan_cli/webui/app.py | 6 +- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 207 ++++++------------- pkgs/clan-cli/clan_cli/webui/task_manager.py | 114 ++++++++++ pkgs/clan-cli/tests/test_vms_api.py | 8 +- 4 files changed, 185 insertions(+), 150 deletions(-) create mode 100644 pkgs/clan-cli/clan_cli/webui/task_manager.py diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index bd58678..b392c21 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -28,8 +28,11 @@ def setup_app() -> FastAPI: app.include_router(flake.router) app.include_router(health.router) app.include_router(machines.router) - app.include_router(root.router) app.include_router(vms.router) + + # Needs to be last in register. Because of wildcard route + app.include_router(root.router) + app.add_exception_handler(vms.NixBuildException, vms.nix_build_exception_handler) app.mount("/static", StaticFiles(directory=asset_path()), name="static") @@ -37,6 +40,7 @@ def setup_app() -> FastAPI: for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name # in this case, 'read_items' + log.debug(f"Registered route: {route}") return app diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 47566fa..ba6881a 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -1,11 +1,10 @@ import asyncio import json import logging -import os -import select -import queue import shlex +import io import subprocess +import pipes import threading from typing import Annotated, AsyncIterator from uuid import UUID, uuid4 @@ -23,13 +22,10 @@ from fastapi.responses import JSONResponse, StreamingResponse from ...nix import nix_build, nix_eval from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse +from ..task_manager import BaseTask, get_task, register_task -# Logging setup log = logging.getLogger(__name__) - router = APIRouter() -app = FastAPI() - def nix_inspect_vm_cmd(machine: str, flake_url: str) -> list[str]: @@ -48,35 +44,6 @@ def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]: ) -@router.post("/api/vms/inspect") -async def inspect_vm( - flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()] -) -> VmInspectResponse: - cmd = nix_inspect_vm_cmd(flake_attr, flake_url=flake_url) - proc = await asyncio.create_subprocess_exec( - cmd[0], - *cmd[1:], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await proc.communicate() - - if proc.returncode != 0: - raise NixBuildException( - f""" -Failed to evaluate vm from '{flake_url}#{flake_attr}'. -command: {shlex.join(cmd)} -exit code: {proc.returncode} -command output: -{stderr.decode("utf-8")} -""" - ) - data = json.loads(stdout) - return VmInspectResponse( - config=VmConfig(flake_url=flake_url, flake_attr=flake_attr, **data) - ) - - class NixBuildException(HTTPException): def __init__(self, uuid: UUID, msg: str, loc: list = ["body", "flake_attr"]): self.uuid = uuid @@ -93,146 +60,97 @@ class NixBuildException(HTTPException): ) -class ProcessState: - def __init__(self, proc: subprocess.Popen): - self.proc: subprocess.Process = proc - self.stdout: list[str] = [] - self.stderr: list[str] = [] - self.returncode: int | None = None - self.done: bool = False +class BuildVmTask(BaseTask): + def __init__(self, uuid: UUID, vm: VmConfig) -> None: + super().__init__(uuid) + self.vm = vm -class BuildVM(threading.Thread): - def __init__(self, vm: VmConfig, uuid: UUID): - # calling parent class constructor - threading.Thread.__init__(self) - - # constructor - self.vm: VmConfig = vm - self.uuid: UUID = uuid - self.log = logging.getLogger(__name__) - self.procs: list[ProcessState] = [] - self.failed: bool = False - self.finished: bool = False - - def run(self): + def run(self) -> None: try: - self.log.debug(f"BuildVM with uuid {self.uuid} started") cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) proc = self.run_cmd(cmd) - out = proc.stdout - self.log.debug(f"out: {out}") + self.log.debug(f"stdout: {proc.stdout}") - vm_path = f"{''.join(out)}/bin/run-nixos-vm" + vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" self.log.debug(f"vm_path: {vm_path}") - self.run_cmd(vm_path) + self.run_cmd(vm_path) self.finished = True except Exception as e: self.failed = True self.finished = True log.exception(e) - def run_cmd(self, cmd: list[str]) -> ProcessState: - cwd = os.getcwd() - log.debug(f"Working directory: {cwd}") - log.debug(f"Running command: {shlex.join(cmd)}") - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - cwd=cwd, - ) - state = ProcessState(process) - self.procs.append(state) - - while process.poll() is None: - # Check if stderr is ready to be read from - rlist, _, _ = select.select([process.stderr, process.stdout], [], [], 0) - if process.stderr in rlist: - line = process.stderr.readline() - state.stderr.append(line) - if process.stdout in rlist: - line = process.stdout.readline() - state.stdout.append(line) - - state.returncode = process.returncode - state.done = True - - if process.returncode != 0: - raise NixBuildException( - self.uuid, f"Failed to run command: {shlex.join(cmd)}" - ) - - log.debug("Successfully ran command") - return state - - -class VmTaskPool: - def __init__(self) -> None: - self.lock: threading.RLock = threading.RLock() - self.pool: dict[UUID, BuildVM] = {} - - def __getitem__(self, uuid: str | UUID) -> BuildVM: - with self.lock: - if type(uuid) is UUID: - return self.pool[uuid] - else: - uuid = UUID(uuid) - return self.pool[uuid] - - def __setitem__(self, uuid: UUID, vm: BuildVM) -> None: - with self.lock: - if uuid in self.pool: - raise KeyError(f"VM with uuid {uuid} already exists") - if type(uuid) is not UUID: - raise TypeError("uuid must be of type UUID") - self.pool[uuid] = vm - - -POOL: VmTaskPool = VmTaskPool() - def nix_build_exception_handler( request: Request, exc: NixBuildException ) -> JSONResponse: log.error("NixBuildException: %s", exc) - # del POOL[exc.uuid] return JSONResponse( status_code=exc.status_code, content=jsonable_encoder(dict(detail=exc.detail)), ) +################################## +# # +# ======== VM ROUTES ======== # +# # +################################## +@router.post("/api/vms/inspect") +async def inspect_vm( + flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()] +) -> VmInspectResponse: + cmd = nix_inspect_vm_cmd(flake_attr, flake_url=flake_url) + proc = await asyncio.create_subprocess_exec( + cmd[0], + *cmd[1:], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: + raise NixBuildException( + "" + f""" +Failed to evaluate vm from '{flake_url}#{flake_attr}'. +command: {shlex.join(cmd)} +exit code: {proc.returncode} +command output: +{stderr.decode("utf-8")} +""" + ) + data = json.loads(stdout) + return VmInspectResponse( + config=VmConfig(flake_url=flake_url, flake_attr=flake_attr, **data) + ) + + @router.get("/api/vms/{uuid}/status") async def get_status(uuid: str) -> VmStatusResponse: - global POOL - handle = POOL[uuid] - - if handle.process.poll() is None: - return VmStatusResponse(running=True, status=0) - else: - return VmStatusResponse(running=False, status=handle.process.returncode) - + task = get_task(uuid) + return VmStatusResponse(running=not task.finished, status=0) @router.get("/api/vms/{uuid}/logs") async def get_logs(uuid: str) -> StreamingResponse: async def stream_logs() -> AsyncIterator[str]: - global POOL - handle = POOL[uuid] - for proc in handle.procs.values(): - while True: - if proc.stdout.empty() and proc.stderr.empty() and not proc.done: - await asyncio.sleep(0.1) - continue - if proc.stdout.empty() and proc.stderr.empty() and proc.done: - break + task = get_task(uuid) + + for proc in task.procs: + if proc.done: + for line in proc.stderr: + yield line for line in proc.stdout: yield line - for line in proc.stderr: + else: + while True: + if proc.output_pipe.empty() and proc.done: + break + line = await proc.output_pipe.get() yield line return StreamingResponse( @@ -240,14 +158,9 @@ async def get_logs(uuid: str) -> StreamingResponse: media_type="text/plain", ) - @router.post("/api/vms/create") async def create_vm( vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks ) -> VmCreateResponse: - global POOL - uuid = uuid4() - handle = BuildVM(vm, uuid) - handle.start() - POOL[uuid] = handle + uuid = register_task(BuildVmTask, vm) return VmCreateResponse(uuid=str(uuid)) diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py new file mode 100644 index 0000000..3ee619e --- /dev/null +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -0,0 +1,114 @@ +import logging +import os +import queue +import select +import shlex +import subprocess +import threading +from uuid import UUID, uuid4 + +class CmdState: + def __init__(self, proc: subprocess.Popen) -> None: + self.proc: subprocess.Process = proc + self.stdout: list[str] = [] + self.stderr: list[str] = [] + self.output_pipe: asyncio.Queue = asyncio.Queue() + self.returncode: int | None = None + self.done: bool = False + +class BaseTask(threading.Thread): + def __init__(self, uuid: UUID) -> None: + # calling parent class constructor + threading.Thread.__init__(self) + + # constructor + self.uuid: UUID = uuid + self.log = logging.getLogger(__name__) + self.procs: list[CmdState] = [] + self.failed: bool = False + self.finished: bool = False + + def run(self) -> None: + self.finished = True + + def run_cmd(self, cmd: list[str]) -> CmdState: + cwd = os.getcwd() + self.log.debug(f"Working directory: {cwd}") + self.log.debug(f"Running command: {shlex.join(cmd)}") + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + cwd=cwd, + ) + state = CmdState(process) + self.procs.append(state) + + while process.poll() is None: + # Check if stderr is ready to be read from + rlist, _, _ = select.select([process.stderr, process.stdout], [], [], 0) + if process.stderr in rlist: + line = process.stderr.readline() + if line != "": + state.stderr.append(line.strip('\n')) + state.output_pipe.put_nowait(line) + if process.stdout in rlist: + line = process.stdout.readline() + if line != "": + state.stdout.append(line.strip('\n')) + state.output_pipe.put_nowait(line) + + state.returncode = process.returncode + state.done = True + + if process.returncode != 0: + raise RuntimeError( + f"Failed to run command: {shlex.join(cmd)}" + ) + + self.log.debug("Successfully ran command") + return state + + +class TaskPool: + def __init__(self) -> None: + self.lock: threading.RLock = threading.RLock() + self.pool: dict[UUID, BaseTask] = {} + + def __getitem__(self, uuid: str | UUID) -> BaseTask: + with self.lock: + if type(uuid) is UUID: + return self.pool[uuid] + else: + uuid = UUID(uuid) + return self.pool[uuid] + + + def __setitem__(self, uuid: UUID, vm: BaseTask) -> None: + with self.lock: + if uuid in self.pool: + raise KeyError(f"VM with uuid {uuid} already exists") + if type(uuid) is not UUID: + raise TypeError("uuid must be of type UUID") + self.pool[uuid] = vm + + +POOL: TaskPool = TaskPool() + + +def get_task(uuid: UUID) -> BaseTask: + global POOL + return POOL[uuid] + + +def register_task(task: BaseTask, *kwargs) -> UUID: + global POOL + if not issubclass(task, BaseTask): + raise TypeError("task must be a subclass of BaseTask") + + uuid = uuid4() + inst_task = task(uuid, *kwargs) + POOL[uuid] = inst_task + inst_task.start() + return uuid diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index fdc1a09..5aa6d91 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -31,14 +31,18 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: graphics=True, ), ) - assert response.status_code == 200, "Failed to inspect vm" + assert response.status_code == 200, "Failed to create vm" uuid = response.json()["uuid"] assert len(uuid) == 36 assert uuid.count("-") == 4 response = api.get(f"/api/vms/{uuid}/status") + assert response.status_code == 200, "Failed to get vm status" + + response = api.get(f"/api/vms/{uuid}/logs") + print("=========LOGS==========") for line in response.stream: print(line) - assert response.status_code == 200, "Failed to get vm status" \ No newline at end of file + assert response.status_code == 200, "Failed to get vm logs" \ No newline at end of file From 991181bf3cb6764a16bbc643b3b0a0019fdf7a6a Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 26 Sep 2023 19:36:40 +0200 Subject: [PATCH 09/21] Integrated pytest into vscode --- pkgs/clan-cli/.vscode/settings.json | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 pkgs/clan-cli/.vscode/settings.json diff --git a/pkgs/clan-cli/.vscode/settings.json b/pkgs/clan-cli/.vscode/settings.json new file mode 100644 index 0000000..9b38853 --- /dev/null +++ b/pkgs/clan-cli/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} \ No newline at end of file From 3a11c0a74689ab08b5915b4742aff0ecb19c35db Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 26 Sep 2023 20:00:30 +0200 Subject: [PATCH 10/21] Fixed vscode test debugging problem --- pkgs/clan-cli/.vscode/launch.json | 1 + pkgs/clan-cli/.vscode/settings.json | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkgs/clan-cli/.vscode/launch.json b/pkgs/clan-cli/.vscode/launch.json index a15810a..ab2ef11 100644 --- a/pkgs/clan-cli/.vscode/launch.json +++ b/pkgs/clan-cli/.vscode/launch.json @@ -11,6 +11,7 @@ "module": "clan_cli.webui", "justMyCode": false, "args": [ "--reload", "--no-open", "--log-level", "debug" ], + } ] } \ No newline at end of file diff --git a/pkgs/clan-cli/.vscode/settings.json b/pkgs/clan-cli/.vscode/settings.json index 9b38853..66a301c 100644 --- a/pkgs/clan-cli/.vscode/settings.json +++ b/pkgs/clan-cli/.vscode/settings.json @@ -1,7 +1,15 @@ { "python.testing.pytestArgs": [ + // Coverage is not supported by vscode: + // https://github.com/Microsoft/vscode-python/issues/693 + // Note that this will make pytest fail if pytest-cov is not installed, + // if that's the case, then this option needs to be be removed (overrides + // can be set at a workspace level, it's up to you to decide what's the + // best approach). You might also prefer to only set this option + // per-workspace (wherever coverage is used). + "--no-cov", "tests" ], "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true + "python.testing.pytestEnabled": true, } \ No newline at end of file From 98028d121f7493c69a270848295295deba7cb2a7 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Wed, 27 Sep 2023 01:52:38 +0200 Subject: [PATCH 11/21] Working log streaming --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 31 ++++---- pkgs/clan-cli/clan_cli/webui/task_manager.py | 77 +++++++++++--------- pkgs/clan-cli/tests/test_vms_api.py | 27 +++---- 3 files changed, 71 insertions(+), 64 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index ba6881a..f3cb272 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -2,12 +2,8 @@ import asyncio import json import logging import shlex -import io -import subprocess -import pipes -import threading -from typing import Annotated, AsyncIterator -from uuid import UUID, uuid4 +from typing import Annotated +from uuid import UUID from fastapi import ( APIRouter, @@ -76,7 +72,7 @@ class BuildVmTask(BaseTask): vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" self.log.debug(f"vm_path: {vm_path}") - self.run_cmd(vm_path) + #self.run_cmd(vm_path) self.finished = True except Exception as e: self.failed = True @@ -137,21 +133,24 @@ async def get_status(uuid: str) -> VmStatusResponse: @router.get("/api/vms/{uuid}/logs") async def get_logs(uuid: str) -> StreamingResponse: - async def stream_logs() -> AsyncIterator[str]: + def stream_logs(): + task = get_task(uuid) for proc in task.procs: if proc.done: + log.debug("stream logs and proc is done") for line in proc.stderr: - yield line + yield line + "\n" for line in proc.stdout: - yield line - else: - while True: - if proc.output_pipe.empty() and proc.done: - break - line = await proc.output_pipe.get() - yield line + yield line + "\n" + break + while True: + out = proc.output + line = out.get() + if line is None: + break + yield line return StreamingResponse( content=stream_logs(), diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index 3ee619e..e512eac 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -7,15 +7,18 @@ import subprocess import threading from uuid import UUID, uuid4 + class CmdState: def __init__(self, proc: subprocess.Popen) -> None: - self.proc: subprocess.Process = proc + global LOOP + self.proc: subprocess.Popen = proc self.stdout: list[str] = [] self.stderr: list[str] = [] - self.output_pipe: asyncio.Queue = asyncio.Queue() + self.output: queue.SimpleQueue = queue.SimpleQueue() self.returncode: int | None = None self.done: bool = False + class BaseTask(threading.Thread): def __init__(self, uuid: UUID) -> None: # calling parent class constructor @@ -35,63 +38,66 @@ class BaseTask(threading.Thread): cwd = os.getcwd() self.log.debug(f"Working directory: {cwd}") self.log.debug(f"Running command: {shlex.join(cmd)}") - process = subprocess.Popen( + p = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", + # shell=True, cwd=cwd, ) - state = CmdState(process) - self.procs.append(state) + self.procs.append(CmdState(p)) + p_state = self.procs[-1] - while process.poll() is None: + while p.poll() is None: # Check if stderr is ready to be read from - rlist, _, _ = select.select([process.stderr, process.stdout], [], [], 0) - if process.stderr in rlist: - line = process.stderr.readline() + rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0) + if p.stderr in rlist: + line = p.stderr.readline() if line != "": - state.stderr.append(line.strip('\n')) - state.output_pipe.put_nowait(line) - if process.stdout in rlist: - line = process.stdout.readline() + p_state.stderr.append(line.strip("\n")) + self.log.debug(f"stderr: {line}") + p_state.output.put(line) + + if p.stdout in rlist: + line = p.stdout.readline() if line != "": - state.stdout.append(line.strip('\n')) - state.output_pipe.put_nowait(line) + p_state.stdout.append(line.strip("\n")) + self.log.debug(f"stdout: {line}") + p_state.output.put(line) - state.returncode = process.returncode - state.done = True + p_state.returncode = p.returncode + p_state.output.put(None) + p_state.done = True - if process.returncode != 0: - raise RuntimeError( - f"Failed to run command: {shlex.join(cmd)}" - ) + if p.returncode != 0: + raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}") self.log.debug("Successfully ran command") - return state + return p_state class TaskPool: def __init__(self) -> None: - self.lock: threading.RLock = threading.RLock() + # self.lock: threading.RLock = threading.RLock() self.pool: dict[UUID, BaseTask] = {} def __getitem__(self, uuid: str | UUID) -> BaseTask: - with self.lock: - if type(uuid) is UUID: - return self.pool[uuid] - else: - uuid = UUID(uuid) - return self.pool[uuid] + # with self.lock: + if type(uuid) is UUID: + return self.pool[uuid] + else: + uuid = UUID(uuid) + return self.pool[uuid] def __setitem__(self, uuid: UUID, vm: BaseTask) -> None: - with self.lock: - if uuid in self.pool: - raise KeyError(f"VM with uuid {uuid} already exists") - if type(uuid) is not UUID: - raise TypeError("uuid must be of type UUID") - self.pool[uuid] = vm + # with self.lock: + if uuid in self.pool: + raise KeyError(f"VM with uuid {uuid} already exists") + if type(uuid) is not UUID: + raise TypeError("uuid must be of type UUID") + self.pool[uuid] = vm POOL: TaskPool = TaskPool() @@ -108,6 +114,7 @@ def register_task(task: BaseTask, *kwargs) -> UUID: raise TypeError("task must be a subclass of BaseTask") uuid = uuid4() + inst_task = task(uuid, *kwargs) POOL[uuid] = inst_task inst_task.start() diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 5aa6d91..5a1a16d 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -4,18 +4,18 @@ import pytest from api import TestClient -@pytest.mark.impure -def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: - response = api.post( - "/api/vms/inspect", - json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"), - ) - assert response.status_code == 200, "Failed to inspect vm" - config = response.json()["config"] - assert config.get("flake_attr") == "vm1" - assert config.get("cores") == 1 - assert config.get("memory_size") == 1024 - assert config.get("graphics") is True +# @pytest.mark.impure +# def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: +# response = api.post( +# "/api/vms/inspect", +# json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"), +# ) +# assert response.status_code == 200, "Failed to inspect vm" +# config = response.json()["config"] +# assert config.get("flake_attr") == "vm1" +# assert config.get("cores") == 1 +# assert config.get("memory_size") == 1024 +# assert config.get("graphics") is True @pytest.mark.impure @@ -43,6 +43,7 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: response = api.get(f"/api/vms/{uuid}/logs") print("=========LOGS==========") for line in response.stream: - print(line) + print(f"line: {line}") + assert line != b"", "Failed to get vm logs" assert response.status_code == 200, "Failed to get vm logs" \ No newline at end of file From a8bab7bb96291201a0f600fe9e20770fcdd42775 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Wed, 27 Sep 2023 02:00:20 +0200 Subject: [PATCH 12/21] Working log streaming --- pkgs/clan-cli/clan_cli/webui/task_manager.py | 29 ++++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index e512eac..cdbc815 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -79,25 +79,24 @@ class BaseTask(threading.Thread): class TaskPool: def __init__(self) -> None: - # self.lock: threading.RLock = threading.RLock() + self.lock: threading.RLock = threading.RLock() self.pool: dict[UUID, BaseTask] = {} def __getitem__(self, uuid: str | UUID) -> BaseTask: - # with self.lock: - if type(uuid) is UUID: - return self.pool[uuid] - else: - uuid = UUID(uuid) - return self.pool[uuid] + with self.lock: + if type(uuid) is UUID: + return self.pool[uuid] + else: + uuid = UUID(uuid) + return self.pool[uuid] - - def __setitem__(self, uuid: UUID, vm: BaseTask) -> None: - # with self.lock: - if uuid in self.pool: - raise KeyError(f"VM with uuid {uuid} already exists") - if type(uuid) is not UUID: - raise TypeError("uuid must be of type UUID") - self.pool[uuid] = vm + def __setitem__(self, uuid: UUID, task: BaseTask) -> None: + with self.lock: + if uuid in self.pool: + raise KeyError(f"Task with uuid {uuid} already exists") + if type(uuid) is not UUID: + raise TypeError("uuid must be of type UUID") + self.pool[uuid] = task POOL: TaskPool = TaskPool() From 82c3d91e85e4ad59535a47e0eea4ae2b7936149e Mon Sep 17 00:00:00 2001 From: Qubasa Date: Wed, 27 Sep 2023 02:01:59 +0200 Subject: [PATCH 13/21] Working version --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index f3cb272..f0c35ac 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -133,8 +133,8 @@ async def get_status(uuid: str) -> VmStatusResponse: @router.get("/api/vms/{uuid}/logs") async def get_logs(uuid: str) -> StreamingResponse: + # Generator function that yields log lines as they are available def stream_logs(): - task = get_task(uuid) for proc in task.procs: From 8d390af122b948069eacb8dfdd3e8c892fa74df8 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Wed, 27 Sep 2023 02:11:13 +0200 Subject: [PATCH 14/21] Working log streaming --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 2 +- pkgs/clan-cli/clan_cli/webui/task_manager.py | 2 +- pkgs/clan-cli/tests/test_vms_api.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index f0c35ac..503962a 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -72,7 +72,7 @@ class BuildVmTask(BaseTask): vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" self.log.debug(f"vm_path: {vm_path}") - #self.run_cmd(vm_path) + self.run_cmd(vm_path) self.finished = True except Exception as e: self.failed = True diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index cdbc815..dd53a70 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -43,7 +43,7 @@ class BaseTask(threading.Thread): stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", - # shell=True, + # shell=True, cwd=cwd, ) self.procs.append(CmdState(p)) diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 5a1a16d..c06d94e 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -3,7 +3,6 @@ from pathlib import Path import pytest from api import TestClient - # @pytest.mark.impure # def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: # response = api.post( @@ -43,7 +42,8 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: response = api.get(f"/api/vms/{uuid}/logs") print("=========LOGS==========") for line in response.stream: - print(f"line: {line}") assert line != b"", "Failed to get vm logs" - - assert response.status_code == 200, "Failed to get vm logs" \ No newline at end of file + print(line.decode("utf-8"), end="") + print("=========END LOGS==========") + assert response.status_code == 200, "Failed to get vm logs" + time.sleep(10) \ No newline at end of file From 08eab785c64ac1370db18f2f1bd2a824313fad1c Mon Sep 17 00:00:00 2001 From: Qubasa Date: Wed, 27 Sep 2023 02:27:53 +0200 Subject: [PATCH 15/21] Completely working log streaming --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 3 ++- pkgs/clan-cli/tests/test_vms_api.py | 11 +++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 503962a..18576f3 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -144,11 +144,12 @@ async def get_logs(uuid: str) -> StreamingResponse: yield line + "\n" for line in proc.stdout: yield line + "\n" - break + continue while True: out = proc.output line = out.get() if line is None: + log.debug("stream logs and line is None") break yield line diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index c06d94e..3f62ae7 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -40,10 +40,17 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: assert response.status_code == 200, "Failed to get vm status" response = api.get(f"/api/vms/{uuid}/logs") - print("=========LOGS==========") + print("=========FLAKE LOGS==========") + for line in response.stream: + assert line != b"", "Failed to get vm logs" + print(line.decode("utf-8"), end="") + print("=========END LOGS==========") + assert response.status_code == 200, "Failed to get vm logs" + + response = api.get(f"/api/vms/{uuid}/logs") + print("=========VM LOGS==========") for line in response.stream: assert line != b"", "Failed to get vm logs" print(line.decode("utf-8"), end="") print("=========END LOGS==========") assert response.status_code == 200, "Failed to get vm logs" - time.sleep(10) \ No newline at end of file From dea49073cbcf7e95c54bca8c7ca146da6c1124cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 27 Sep 2023 11:11:30 +0200 Subject: [PATCH 16/21] cli/vms: serialize uuid already in the api --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 14 ++++---------- pkgs/clan-cli/clan_cli/webui/task_manager.py | 8 ++------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 18576f3..179b9e1 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -5,14 +5,7 @@ import shlex from typing import Annotated from uuid import UUID -from fastapi import ( - APIRouter, - BackgroundTasks, - Body, - HTTPException, - Request, - status, -) +from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request, status from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse, StreamingResponse @@ -126,13 +119,13 @@ command output: @router.get("/api/vms/{uuid}/status") -async def get_status(uuid: str) -> VmStatusResponse: +async def get_status(uuid: UUID) -> VmStatusResponse: task = get_task(uuid) return VmStatusResponse(running=not task.finished, status=0) @router.get("/api/vms/{uuid}/logs") -async def get_logs(uuid: str) -> StreamingResponse: +async def get_logs(uuid: UUID) -> StreamingResponse: # Generator function that yields log lines as they are available def stream_logs(): task = get_task(uuid) @@ -158,6 +151,7 @@ async def get_logs(uuid: str) -> StreamingResponse: media_type="text/plain", ) + @router.post("/api/vms/create") async def create_vm( vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index dd53a70..21ea7ae 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -82,13 +82,9 @@ class TaskPool: self.lock: threading.RLock = threading.RLock() self.pool: dict[UUID, BaseTask] = {} - def __getitem__(self, uuid: str | UUID) -> BaseTask: + def __getitem__(self, uuid: UUID) -> BaseTask: with self.lock: - if type(uuid) is UUID: - return self.pool[uuid] - else: - uuid = UUID(uuid) - return self.pool[uuid] + return self.pool[uuid] def __setitem__(self, uuid: UUID, task: BaseTask) -> None: with self.lock: From b34365077161b572c3b0bdb06617bb8af04730e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 27 Sep 2023 11:14:44 +0200 Subject: [PATCH 17/21] make vm terminate after boot --- pkgs/clan-cli/tests/test_flake_with_core/flake.nix | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkgs/clan-cli/tests/test_flake_with_core/flake.nix b/pkgs/clan-cli/tests/test_flake_with_core/flake.nix index 1337235..c4e03bd 100644 --- a/pkgs/clan-cli/tests/test_flake_with_core/flake.nix +++ b/pkgs/clan-cli/tests/test_flake_with_core/flake.nix @@ -17,6 +17,16 @@ system.stateVersion = lib.version; clan.networking.zerotier.controller.enable = true; + + systemd.services.shutdown-after-boot = { + enable = true; + wantedBy = [ "multi-user.target" ]; + after = [ "multi-user.target" ]; + script = '' + #!/usr/bin/env bash + shutdown -h now + ''; + }; }; }; }; From 80aa7f06fb5f6b039acc02e849bb6550a0512c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 27 Sep 2023 11:28:22 +0200 Subject: [PATCH 18/21] task_manager: assert stdout/stderr exist to make mypy happy --- pkgs/clan-cli/clan_cli/webui/task_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index 21ea7ae..25890ee 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -53,6 +53,7 @@ class BaseTask(threading.Thread): # Check if stderr is ready to be read from rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0) if p.stderr in rlist: + assert p.stderr is not None line = p.stderr.readline() if line != "": p_state.stderr.append(line.strip("\n")) @@ -60,6 +61,7 @@ class BaseTask(threading.Thread): p_state.output.put(line) if p.stdout in rlist: + assert p.stdout is not None line = p.stdout.readline() if line != "": p_state.stdout.append(line.strip("\n")) From e5899c8e1016901e32f7c75fc9a790e73c7f2000 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 27 Sep 2023 11:28:38 +0200 Subject: [PATCH 19/21] custom_logger: fix type errror in format_time --- pkgs/clan-cli/clan_cli/custom_logger.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/custom_logger.py b/pkgs/clan-cli/clan_cli/custom_logger.py index b16fc89..66d4ac1 100644 --- a/pkgs/clan-cli/clan_cli/custom_logger.py +++ b/pkgs/clan-cli/clan_cli/custom_logger.py @@ -26,8 +26,7 @@ class CustomFormatter(logging.Formatter): def format_time(self, record: Any, datefmt: Any = None) -> str: now = datetime.datetime.now() - now = now.strftime("%H:%M:%S") - return now + return now.strftime("%H:%M:%S") def format(self, record: Any) -> str: log_fmt = self.FORMATS.get(record.levelno) From 244ae371448ff4873f2e8c0f3c8e83768a0ae5b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 27 Sep 2023 11:35:43 +0200 Subject: [PATCH 20/21] cli: simplify log handler --- pkgs/clan-cli/clan_cli/custom_logger.py | 53 +++++++++++-------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/custom_logger.py b/pkgs/clan-cli/clan_cli/custom_logger.py index 66d4ac1..d8b7f9f 100644 --- a/pkgs/clan-cli/clan_cli/custom_logger.py +++ b/pkgs/clan-cli/clan_cli/custom_logger.py @@ -1,38 +1,33 @@ -import datetime import logging from typing import Any +grey = "\x1b[38;20m" +yellow = "\x1b[33;20m" +red = "\x1b[31;20m" +bold_red = "\x1b[31;1m" +green = "\u001b[32m" +blue = "\u001b[34m" + + +def get_formatter(color: str) -> logging.Formatter: + reset = "\x1b[0m" + return logging.Formatter( + f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s" + ) + + +FORMATTER = { + logging.DEBUG: get_formatter(blue), + logging.INFO: get_formatter(green), + logging.WARNING: get_formatter(yellow), + logging.ERROR: get_formatter(red), + logging.CRITICAL: get_formatter(bold_red), +} + class CustomFormatter(logging.Formatter): - grey = "\x1b[38;20m" - yellow = "\x1b[33;20m" - red = "\x1b[31;20m" - bold_red = "\x1b[31;1m" - green = "\u001b[32m" - blue = "\u001b[34m" - - @staticmethod - def format_str(color: str) -> str: - reset = "\x1b[0m" - return f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s" - - FORMATS = { - logging.DEBUG: format_str(blue), - logging.INFO: format_str(green), - logging.WARNING: format_str(yellow), - logging.ERROR: format_str(red), - logging.CRITICAL: format_str(bold_red), - } - - def format_time(self, record: Any, datefmt: Any = None) -> str: - now = datetime.datetime.now() - return now.strftime("%H:%M:%S") - def format(self, record: Any) -> str: - log_fmt = self.FORMATS.get(record.levelno) - formatter = logging.Formatter(log_fmt) - formatter.formatTime = self.format_time - return formatter.format(record) + return FORMATTER[record.levelno].format(record) def register(level: Any) -> None: From 4317e681cf8203b4f6dccb6defd6121f1f4c913e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 27 Sep 2023 11:45:07 +0200 Subject: [PATCH 21/21] cli: fix remaining typing errors --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 11 ++++------- pkgs/clan-cli/clan_cli/webui/task_manager.py | 5 +++-- pkgs/clan-cli/tests/test_vms_api.py | 3 +++ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 179b9e1..5b59329 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -2,7 +2,7 @@ import asyncio import json import logging import shlex -from typing import Annotated +from typing import Annotated, Iterator from uuid import UUID from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request, status @@ -34,12 +34,10 @@ def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]: class NixBuildException(HTTPException): - def __init__(self, uuid: UUID, msg: str, loc: list = ["body", "flake_attr"]): - self.uuid = uuid + def __init__(self, msg: str, loc: list = ["body", "flake_attr"]): detail = [ { "loc": loc, - "uuid": str(uuid), "msg": msg, "type": "value_error", } @@ -65,7 +63,7 @@ class BuildVmTask(BaseTask): vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" self.log.debug(f"vm_path: {vm_path}") - self.run_cmd(vm_path) + self.run_cmd([vm_path]) self.finished = True except Exception as e: self.failed = True @@ -103,7 +101,6 @@ async def inspect_vm( if proc.returncode != 0: raise NixBuildException( - "" f""" Failed to evaluate vm from '{flake_url}#{flake_attr}'. command: {shlex.join(cmd)} @@ -127,7 +124,7 @@ async def get_status(uuid: UUID) -> VmStatusResponse: @router.get("/api/vms/{uuid}/logs") async def get_logs(uuid: UUID) -> StreamingResponse: # Generator function that yields log lines as they are available - def stream_logs(): + def stream_logs() -> Iterator[str]: task = get_task(uuid) for proc in task.procs: diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index 25890ee..21374cb 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -5,6 +5,7 @@ import select import shlex import subprocess import threading +from typing import Any from uuid import UUID, uuid4 @@ -105,14 +106,14 @@ def get_task(uuid: UUID) -> BaseTask: return POOL[uuid] -def register_task(task: BaseTask, *kwargs) -> UUID: +def register_task(task: type, *args: Any) -> UUID: global POOL if not issubclass(task, BaseTask): raise TypeError("task must be a subclass of BaseTask") uuid = uuid4() - inst_task = task(uuid, *kwargs) + inst_task = task(uuid, *args) POOL[uuid] = inst_task inst_task.start() return uuid diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 3f62ae7..2939bc5 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -2,6 +2,7 @@ from pathlib import Path import pytest from api import TestClient +from httpx import SyncByteStream # @pytest.mark.impure # def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: @@ -41,6 +42,7 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: response = api.get(f"/api/vms/{uuid}/logs") print("=========FLAKE LOGS==========") + assert isinstance(response.stream, SyncByteStream) for line in response.stream: assert line != b"", "Failed to get vm logs" print(line.decode("utf-8"), end="") @@ -48,6 +50,7 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: assert response.status_code == 200, "Failed to get vm logs" response = api.get(f"/api/vms/{uuid}/logs") + assert isinstance(response.stream, SyncByteStream) print("=========VM LOGS==========") for line in response.stream: assert line != b"", "Failed to get vm logs"