generated from Luis/nextjs-python-web-template
Added usage field to service
This commit is contained in:
@@ -7,21 +7,101 @@ from sqlalchemy.sql.expression import true
|
||||
from ..errors import ClanError
|
||||
from . import schemas, sql_models
|
||||
|
||||
|
||||
#########################
|
||||
# #
|
||||
# service #
|
||||
# #
|
||||
#########################
|
||||
|
||||
|
||||
def create_service(db: Session, service: schemas.ServiceCreate) -> sql_models.Service:
|
||||
db_service = sql_models.Service(**service.dict())
|
||||
if get_entity_by_did(db, service.entity_did) is None:
|
||||
raise ClanError(f"Entity with did '{service.entity_did}' not found")
|
||||
if get_service_by_uuid(db, service.uuid) is not None:
|
||||
raise ClanError(f"Service with uuid '{service.uuid}' already exists")
|
||||
db_service = sql_models.Service(
|
||||
uuid=service.uuid,
|
||||
service_name=service.service_name,
|
||||
service_type=service.service_type,
|
||||
endpoint_url=service.endpoint_url,
|
||||
status=service.status,
|
||||
other=service.other,
|
||||
entity_did=service.entity_did,
|
||||
)
|
||||
db_usage = []
|
||||
for usage in service.usage:
|
||||
db_usage.append(
|
||||
sql_models.ServiceUsage(
|
||||
times_consumed=usage.times_consumed,
|
||||
consumer_entity_did=usage.consumer_entity_did,
|
||||
)
|
||||
)
|
||||
db_service.usage = db_usage
|
||||
db.add(db_service)
|
||||
db.commit()
|
||||
db.refresh(db_service)
|
||||
return db_service
|
||||
|
||||
|
||||
def set_service_usage(
|
||||
db: Session, service_uuid: str, usages: List[schemas.ServiceUsageCreate]
|
||||
) -> sql_models.Service:
|
||||
db_service = get_service_by_uuid(db, service_uuid)
|
||||
if db_service is None:
|
||||
raise ClanError(f"Service with uuid '{service_uuid}' not found")
|
||||
db_usage = []
|
||||
for usage in usages:
|
||||
db_usage.append(
|
||||
sql_models.ServiceUsage(
|
||||
times_consumed=usage.times_consumed,
|
||||
consumer_entity_did=usage.consumer_entity_did,
|
||||
)
|
||||
)
|
||||
db_service.usage = db_usage
|
||||
db.add(db_service)
|
||||
db.commit()
|
||||
db.refresh(db_service)
|
||||
return db_service
|
||||
|
||||
|
||||
def add_service_usage(
|
||||
db: Session, service_uuid: str, usage: schemas.ServiceUsageCreate
|
||||
) -> sql_models.Service:
|
||||
db_service = get_service_by_uuid(db, service_uuid)
|
||||
if db_service is None:
|
||||
raise ClanError(f"Service with uuid '{service_uuid}' not found")
|
||||
db_service.usage.append(
|
||||
sql_models.ServiceUsage(
|
||||
times_consumed=usage.times_consumed,
|
||||
consumer_entity_did=usage.consumer_entity_did,
|
||||
)
|
||||
)
|
||||
db.add(db_service)
|
||||
db.commit()
|
||||
db.refresh(db_service)
|
||||
return db_service
|
||||
|
||||
|
||||
def increment_service_usage(
|
||||
db: Session, service_uuid: str, consumer_entity_did: str
|
||||
) -> sql_models.ServiceUsage:
|
||||
db_service = get_service_by_uuid(db, service_uuid)
|
||||
if db_service is None:
|
||||
raise ClanError(f"Service with uuid '{service_uuid}' not found")
|
||||
# TODO: Make a query for this
|
||||
for usage in db_service.usage:
|
||||
if usage.consumer_entity_did == consumer_entity_did:
|
||||
usage.times_consumed += 1
|
||||
break
|
||||
db.add(db_service)
|
||||
db.commit()
|
||||
db.refresh(db_service)
|
||||
return db_service
|
||||
|
||||
|
||||
def get_service_by_uuid(db: Session, uuid: str) -> Optional[sql_models.Service]:
|
||||
return db.query(sql_models.Service).filter(sql_models.Service.uuid == uuid).first()
|
||||
|
||||
|
||||
def get_services(
|
||||
db: Session, skip: int = 0, limit: int = 100
|
||||
) -> List[sql_models.Service]:
|
||||
@@ -65,6 +145,10 @@ def get_services_without_entity_id(
|
||||
# #
|
||||
#########################
|
||||
def create_entity(db: Session, entity: schemas.EntityCreate) -> sql_models.Entity:
|
||||
if get_entity_by_did(db, entity.did) is not None:
|
||||
raise ClanError(f"Entity with did '{entity.did}' already exists")
|
||||
if get_entity_by_name_or_did(db, entity.name) is not None:
|
||||
raise ClanError(f"Entity with name '{entity.name}' already exists")
|
||||
db_entity = sql_models.Entity(
|
||||
did=entity.did,
|
||||
name=entity.name,
|
||||
@@ -142,7 +226,9 @@ def get_attached_entities(
|
||||
|
||||
|
||||
# Returns same entity if setting didnt changed something
|
||||
def set_stop_health_task(db: Session, entity_did: str, value: bool) -> None:
|
||||
def set_stop_health_task(
|
||||
db: Session, entity_did: str, value: bool
|
||||
) -> sql_models.Entity:
|
||||
db_entity = get_entity_by_did(db, entity_did)
|
||||
if db_entity is None:
|
||||
raise ClanError(f"Entity with did '{entity_did}' not found")
|
||||
@@ -152,9 +238,13 @@ def set_stop_health_task(db: Session, entity_did: str, value: bool) -> None:
|
||||
# save changes in db
|
||||
db.add(db_entity)
|
||||
db.commit()
|
||||
db.refresh(db_entity)
|
||||
return db_entity
|
||||
|
||||
|
||||
def set_attached_by_entity_did(db: Session, entity_did: str, attached: bool) -> None:
|
||||
def set_attached_by_entity_did(
|
||||
db: Session, entity_did: str, attached: bool
|
||||
) -> sql_models.Entity:
|
||||
db_entity = get_entity_by_did(db, entity_did)
|
||||
if db_entity is None:
|
||||
raise ClanError(f"Entity with did '{entity_did}' not found")
|
||||
@@ -164,6 +254,8 @@ def set_attached_by_entity_did(db: Session, entity_did: str, attached: bool) ->
|
||||
# save changes in db
|
||||
db.add(db_entity)
|
||||
db.commit()
|
||||
db.refresh(db_entity)
|
||||
return db_entity
|
||||
|
||||
|
||||
def delete_entity_by_did(db: Session, did: str) -> None:
|
||||
|
||||
Reference in New Issue
Block a user