georgs5 (#31)
All checks were successful
assets1 / test (push) Successful in 23s
checks-impure / test (push) Successful in 32s
checks / test (push) Successful in 1m13s

Reviewed-on: #31
This commit was merged in pull request #31.
This commit is contained in:
Georg-Stahn
2023-12-04 22:22:22 +01:00
parent f3096075d2
commit 171582e2e7
4 changed files with 104 additions and 4 deletions

View File

@@ -33,6 +33,8 @@ async def lifespan(app: FastAPI) -> Any:
def setup_app() -> FastAPI:
# bind sql engine
# TODO comment aut and add flag to run with pupulated data rm *.sql run pytest with marked then start clan webui
# https://docs.pytest.org/en/7.1.x/example/markers.html
sql_models.Base.metadata.drop_all(engine)
sql_models.Base.metadata.create_all(bind=engine)

View File

@@ -1,6 +1,8 @@
import time
from typing import List, Optional
from fastapi import APIRouter, Depends
import httpx
from fastapi import APIRouter, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from .. import sql_crud, sql_db, sql_models
@@ -139,8 +141,11 @@ def get_repository(
@router.post("/api/v1/create_entity", response_model=Entity, tags=[Tags.entities])
def create_entity(
entity: EntityCreate, db: Session = Depends(sql_db.get_db)
) -> EntityCreate:
) -> EntityCreate | str:
# todo checken ob schon da ...
if sql_crud.get_entity_by_did(db, did=entity.did):
print("did already exsists")
return "Error did already exsists in db"
return sql_crud.create_entity(db, entity)
@@ -155,9 +160,67 @@ def get_entities(
@router.get("/api/v1/get_entity", response_model=Optional[Entity], tags=[Tags.entities])
def get_entity(
entity_did: str = "did:sov:test:1234",
skip: int = 0,
limit: int = 100,
db: Session = Depends(sql_db.get_db),
) -> Optional[sql_models.Entity]:
entity = sql_crud.get_entity_by_did(db, did=entity_did)
return entity
@router.get(
"/api/v1/get_attached_entities",
response_model=List[Entity],
tags=[Tags.entities],
)
def get_attached_entities(
skip: int = 0, limit: int = 100, db: Session = Depends(sql_db.get_db)
) -> List[sql_models.Entity]:
entities = sql_crud.get_attached_entities(db, skip=skip, limit=limit)
return entities
@router.get("/api/v1/detach")
async def detach(
background_tasks: BackgroundTasks,
entity_did: str = "did:sov:test:1234",
skip: int = 0,
limit: int = 100,
db: Session = Depends(sql_db.get_db),
) -> dict[str, str]:
background_tasks.add_task(
sql_crud.set_attached_by_entity_did, db, entity_did, False
)
return {"message": "Detaching in the background"}
@router.get("/api/v1/attach")
async def attach(
background_tasks: BackgroundTasks,
entity_did: str = "did:sov:test:1234",
skip: int = 0,
limit: int = 100,
db: Session = Depends(sql_db.get_db),
) -> dict[str, str]:
background_tasks.add_task(attach_entity, db, entity_did)
return {"message": "Attaching in the background"}
# TODO
def attach_entity(db: Session, entity_did: str) -> None:
db_entity = sql_crud.set_attached_by_entity_did(db, entity_did, True)
try:
if db_entity is not None:
while db_entity.attached:
# query status endpoint
# https://www.python-httpx.org/
response = httpx.get(f"http://{db_entity.ip}", timeout=2)
print(response)
# test with:
# while true ; do printf 'HTTP/1.1 200 OK\r\n\r\ncool, thanks' | nc -l -N localhost 5555 ; done
# client test (apt install python3-httpx):
# httpx http://localhost:5555
# except not reached set false
time.sleep(1)
except Exception as e:
print(e)
if db_entity is not None:
db_entity = sql_crud.set_attached_by_entity_did(db, entity_did, False)

View File

@@ -1,6 +1,7 @@
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import true
from . import schemas, sql_models
@@ -136,3 +137,37 @@ def get_entities(
def get_entity_by_did(db: Session, did: str) -> Optional[sql_models.Entity]:
return db.query(sql_models.Entity).filter(sql_models.Entity.did == did).first()
# get attached
def get_attached_entities(
db: Session, skip: int = 0, limit: int = 100
) -> List[sql_models.Entity]:
return (
db.query(sql_models.Entity)
.filter(sql_models.Entity.attached == true())
# https://stackoverflow.com/questions/18998010/flake8-complains-on-boolean-comparison-in-filter-clause
.offset(skip)
.limit(limit)
.all()
)
# set attached
# None if did not found
# Returns same entity if setting didnt changed something
def set_attached_by_entity_did(
db: Session, entity_did: str, value: bool
) -> Optional[sql_models.Entity]:
# ste attached to true
db_entity = get_entity_by_did(db, entity_did)
if db_entity is not None:
# db_entity.attached = Column(True)
setattr(db_entity, "attached", value)
# save changes in db
db.add(db_entity)
db.commit()
db.refresh(db_entity)
return db_entity
else:
return db_entity