nearly done with attached checker ... todo client.get and right nc -l 5555 call
Some checks failed
checks-impure / test (pull_request) Failing after 25s
checks / test (pull_request) Failing after 1m42s

This commit is contained in:
Georg-Stahn
2023-12-03 22:13:00 +01:00
parent 1142ee1b80
commit 98747bc39b
4 changed files with 44 additions and 27 deletions

View File

@@ -1,6 +1,7 @@
import time
from typing import List, Optional
from fastapi import APIRouter, BackgroundTasks, Depends
from fastapi import APIRouter, BackgroundTasks, Depends, Request
from sqlalchemy.orm import Session
from .. import sql_crud, sql_db, sql_models
@@ -111,7 +112,7 @@ def create_repository(
response_model=List[Repository],
tags=[Tags.repositories],
)
def get_repositories(
def get_repositories(get
skip: int = 0, limit: int = 100, db: Session = Depends(sql_db.get_db)
) -> List[sql_models.Repository]:
repositories = sql_crud.get_repositories(db, skip=skip, limit=limit)
@@ -166,6 +167,28 @@ def get_entity(
return entity
@router.get("/api/v1/get_attached_entities", response_model=Optional[Entity], tags=[Tags.entities])
def get_attached_entities(
skip: int = 0, limit: int = 100, db: Session = Depends(sql_db.get_db)
) -> List[sql_models.Entity]:
entities = sql_crud.get_attached_entities(db, skip=skip, limit=limit)
return entities
@router.get("/api/v1/detach", response_model=Optional[Entity], tags=[Tags.entities])
async def detach(
background_tasks: BackgroundTasks,
entity_did: str = "did:sov:test:1234",
skip: int = 0,
limit: int = 100,
db: Session = Depends(sql_db.get_db),
) -> dict[str, str]:
background_tasks.add_task(
sql_crud.set_attached_by_entity_did, db, entity_did, False
)
return {"message": "Detaching in the background"}
@router.get("/api/v1/attach", response_model=Optional[Entity], tags=[Tags.entities])
async def attach(
background_tasks: BackgroundTasks,
@@ -181,22 +204,16 @@ async def attach(
# TODO
def attach_entity(entity_did: str, db: Session) -> None:
sql_crud.attach_entity_by_did(db, entity_did, True)
while db_entity.attached:
#query status endpoint
#except not reached set false
time.sleep(5)
@router.get("/api/v1/detach", response_model=Optional[Entity], tags=[Tags.entities])
async def detach(
background_tasks: BackgroundTasks,
entity_did: str = "did:sov:test:1234",
skip: int = 0,
limit: int = 100,
db: Session = Depends(sql_db.get_db),
) -> dict[str, str]:
background_tasks.add_task(
sql_crud.set_attached_by_entity_did, db, entity_did, False
)
return {"message": "Detaching in the background"}
db_entity = sql_crud.set_attached_by_entity_did(db, entity_did, True)
try:
while db_entity.attached:
#query status endpoint
subprocess.run(f'curl http://{db_entity.ip}')
# test with: while true; do { echo -e 'HTTP/1.1 200 OK\r\n'; echo '<html>test</html>'; } | nc -l 5556; done
#except not reached set false
time.sleep(1)
except Exception as e:
raise e
#async with httpx.AsyncClient() as client:
# r =await client.get