generated from Luis/nextjs-python-web-template
Fixed background task not stopping on detach
This commit is contained in:
@@ -33,6 +33,7 @@ for u in cors_url:
|
||||
# Logging setup
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI) -> Any:
|
||||
await socket_manager2.brd.connect()
|
||||
|
||||
@@ -2,7 +2,6 @@ import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
import asyncio
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends
|
||||
@@ -16,7 +15,6 @@ from ..schemas import (
|
||||
Resolution,
|
||||
Service,
|
||||
ServiceCreate,
|
||||
ServicesByName,
|
||||
)
|
||||
from ..tags import Tags
|
||||
|
||||
@@ -46,9 +44,7 @@ def get_all_services(
|
||||
return services
|
||||
|
||||
|
||||
@router.get(
|
||||
"/api/v1/service", response_model=List[Service], tags=[Tags.services]
|
||||
)
|
||||
@router.get("/api/v1/service", response_model=List[Service], tags=[Tags.services])
|
||||
def get_service_by_did(
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
skip: int = 0,
|
||||
@@ -73,6 +69,7 @@ def get_services_without_entity(
|
||||
service = sql_crud.get_services_without_entity_id(db, entity_did=entity_did)
|
||||
return service
|
||||
|
||||
|
||||
@router.delete("/api/v1/service", tags=[Tags.services])
|
||||
def delete_service(
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
@@ -129,9 +126,7 @@ def get_all_entities(
|
||||
return entities
|
||||
|
||||
|
||||
@router.get(
|
||||
"/api/v1/entity", response_model=Optional[Entity], tags=[Tags.entities]
|
||||
)
|
||||
@router.get("/api/v1/entity", response_model=Optional[Entity], tags=[Tags.entities])
|
||||
def get_entity_by_did(
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
db: Session = Depends(sql_db.get_db),
|
||||
@@ -151,11 +146,44 @@ def get_attached_entities(
|
||||
entities = sql_crud.get_attached_entities(db, skip=skip, limit=limit)
|
||||
return entities
|
||||
|
||||
|
||||
@router.post("/api/v1/detach", tags=[Tags.entities])
|
||||
def detach_entity(
|
||||
background_tasks: BackgroundTasks,
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
db: Session = Depends(sql_db.get_db),
|
||||
) -> dict[str, str]:
|
||||
entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
if entity is None:
|
||||
raise ClanError(f"Entity with did '{entity_did}' not found")
|
||||
sql_crud.set_stop_health_task(db, entity_did, True)
|
||||
return {"message": f"Detached {entity_did} successfully"}
|
||||
|
||||
|
||||
@router.post("/api/v1/attach", tags=[Tags.entities])
|
||||
def attach_entity(
|
||||
background_tasks: BackgroundTasks,
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
db: Session = Depends(sql_db.get_db),
|
||||
) -> dict[str, str]:
|
||||
entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
if entity is None:
|
||||
raise ClanError(f"Entity with did '{entity_did}' not found")
|
||||
url = f"http://{entity.ip}"
|
||||
sql_crud.set_stop_health_task(db, entity_did, False)
|
||||
print("Start health query at", url)
|
||||
background_tasks.add_task(attach_entity_loc, db, entity_did)
|
||||
return {"message": f"Started attachment task for {entity.name}"}
|
||||
|
||||
|
||||
@router.get("/api/v1/is_attached", tags=[Tags.entities])
|
||||
def is_attached(
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
db: Session = Depends(sql_db.get_db)) -> dict[str, str]:
|
||||
|
||||
entity_did: str = "did:sov:test:1234", db: Session = Depends(sql_db.get_db)
|
||||
) -> dict[str, str]:
|
||||
entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
|
||||
if entity is None:
|
||||
@@ -171,74 +199,35 @@ def is_attached(
|
||||
url = f"http://{entity.ip}"
|
||||
raise ClanError(f"Entity at {url} not reachable")
|
||||
|
||||
db.refresh(entity)
|
||||
return {"message": f"Attached to {entity.name} successfully"}
|
||||
|
||||
|
||||
def attach_entity_loc(db: Session, entity_did: str) -> None:
|
||||
entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
try:
|
||||
assert entity is not None
|
||||
url = f"http://{entity.ip}"
|
||||
|
||||
@router.post("/api/v1/detach", tags=[Tags.entities])
|
||||
def detach_entity(
|
||||
background_tasks: BackgroundTasks,
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
db: Session = Depends(sql_db.get_db),
|
||||
) -> dict[str, str]:
|
||||
sql_crud.stop_entity_health_task(db, entity_did)
|
||||
return {"message": f"Detached {entity_did} successfully"}
|
||||
|
||||
|
||||
@router.post("/api/v1/attach", tags=[Tags.entities])
|
||||
def attach_entity(
|
||||
background_tasks: BackgroundTasks,
|
||||
entity_did: str = "did:sov:test:1234",
|
||||
skip: int = 0,
|
||||
limit: int = 100
|
||||
) -> dict[str, str]:
|
||||
# entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
# if entity is None:
|
||||
# raise ClanError(f"Entity with did '{entity_did}' not found")
|
||||
#url = f"http://{entity.ip}"
|
||||
#log.info("Start health query at", url)
|
||||
background_tasks.add_task(attach_entity_loc, entity_did)
|
||||
return {"message": f"Started attachment task for {entity_did}"}
|
||||
|
||||
|
||||
def attach_entity_loc(entity_did: str) -> None:
|
||||
with sql_db.SessionLocal() as db:
|
||||
entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
while entity.stop_health_task is False:
|
||||
entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
assert entity is not None
|
||||
log.warning(f"Stop health status task for {entity.stop_health_task}")
|
||||
response = httpx.get(url, timeout=2)
|
||||
if response.status_code != 200:
|
||||
raise ClanError(
|
||||
f"Entity with did '{entity_did}' returned {response.status_code}"
|
||||
)
|
||||
|
||||
if entity.attached is False:
|
||||
sql_crud.set_attached_by_entity_did(db, entity_did, True)
|
||||
if entity is None:
|
||||
raise ClanError(f"Entity with did '{entity_did}' has been deleted")
|
||||
|
||||
time.sleep(1)
|
||||
# entity = sql_crud.get_entity_by_did(db, did=entity_did)
|
||||
# assert entity is not None
|
||||
# try:
|
||||
|
||||
# while True:
|
||||
|
||||
|
||||
# if entity.stop_health_task:
|
||||
# print(f"Stop health status task for {entity.name}")
|
||||
# break
|
||||
|
||||
# url = f"http://{entity.ip}"
|
||||
# response = httpx.get(url, timeout=2)
|
||||
# #log.warning(f"Response {response.status_code} from {url}")
|
||||
# print(f"{entity.name}: stop_health_task: {entity.stop_health_task}")
|
||||
# if response.status_code != 200:
|
||||
# raise ClanError(f"Entity with did '{entity_did}' returned {response.status_code}")
|
||||
|
||||
# if entity.attached is False:
|
||||
# sql_crud.set_attached_by_entity_did(db, entity_did, True)
|
||||
# if entity is None:
|
||||
# raise ClanError(f"Entity with did '{entity_did}' has been deleted")
|
||||
|
||||
# time.sleep(1)
|
||||
# except Exception:
|
||||
# log.warning(f"Entity {entity_did} not reachable at {url}")
|
||||
# finally:
|
||||
# sql_crud.set_attached_by_entity_did(db, entity_did, False)
|
||||
db.refresh(entity)
|
||||
except Exception:
|
||||
print(f"Entity {entity_did} not reachable at {url}")
|
||||
finally:
|
||||
sql_crud.set_attached_by_entity_did(db, entity_did, False)
|
||||
sql_crud.set_stop_health_task(db, entity_did, False)
|
||||
|
||||
|
||||
@router.delete("/api/v1/entity", tags=[Tags.entities])
|
||||
|
||||
@@ -64,7 +64,9 @@ def get_services_without_entity_id(
|
||||
# #
|
||||
#########################
|
||||
def create_entity(db: Session, entity: schemas.EntityCreate) -> sql_models.Entity:
|
||||
db_entity = sql_models.Entity(**entity.dict(), attached=False, stop_health_task=False)
|
||||
db_entity = sql_models.Entity(
|
||||
**entity.dict(), attached=False, stop_health_task=False
|
||||
)
|
||||
db.add(db_entity)
|
||||
db.commit()
|
||||
db.refresh(db_entity)
|
||||
@@ -100,14 +102,12 @@ def get_attached_entities(
|
||||
|
||||
|
||||
# Returns same entity if setting didnt changed something
|
||||
def stop_entity_health_task(
|
||||
db: Session, entity_did: str
|
||||
) -> None:
|
||||
def set_stop_health_task(db: Session, entity_did: str, value: bool) -> None:
|
||||
db_entity = get_entity_by_did(db, entity_did)
|
||||
if db_entity is None:
|
||||
raise ClanError(f"Entity with did '{entity_did}' not found")
|
||||
|
||||
setattr(db_entity, "stop_health_task", True)
|
||||
setattr(db_entity, "stop_health_task", value)
|
||||
|
||||
# save changes in db
|
||||
db.add(db_entity)
|
||||
@@ -115,10 +115,7 @@ def stop_entity_health_task(
|
||||
db.refresh(db_entity)
|
||||
|
||||
|
||||
|
||||
def set_attached_by_entity_did(
|
||||
db: Session, entity_did: str, attached: bool
|
||||
) -> None:
|
||||
def set_attached_by_entity_did(db: Session, entity_did: str, attached: bool) -> None:
|
||||
db_entity = get_entity_by_did(db, entity_did)
|
||||
if db_entity is None:
|
||||
raise ClanError(f"Entity with did '{entity_did}' not found")
|
||||
|
||||
Reference in New Issue
Block a user