generated from Luis/nextjs-python-web-template
300 lines
9.0 KiB
Python
300 lines
9.0 KiB
Python
from typing import List, Optional
|
|
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.sql.expression import true
|
|
|
|
from ..errors import ClanError
|
|
from . import schemas, sql_models
|
|
|
|
|
|
#########################
|
|
# #
|
|
# service #
|
|
# #
|
|
#########################
|
|
def create_service(db: Session, service: schemas.ServiceCreate) -> sql_models.Service:
|
|
if get_entity_by_did(db, service.entity_did) is None:
|
|
raise ClanError(f"Entity with did '{service.entity_did}' not found")
|
|
if get_service_by_uuid(db, service.uuid) is not None:
|
|
raise ClanError(f"Service with uuid '{service.uuid}' already exists")
|
|
db_service = sql_models.Service(
|
|
uuid=service.uuid,
|
|
service_name=service.service_name,
|
|
service_type=service.service_type,
|
|
endpoint_url=service.endpoint_url,
|
|
status=service.status,
|
|
other=service.other,
|
|
entity_did=service.entity_did,
|
|
)
|
|
db_usage = []
|
|
for usage in service.usage:
|
|
db_usage.append(
|
|
sql_models.ServiceUsage(
|
|
times_consumed=usage.times_consumed,
|
|
consumer_entity_did=usage.consumer_entity_did,
|
|
)
|
|
)
|
|
db_service.usage = db_usage
|
|
db.add(db_service)
|
|
db.commit()
|
|
db.refresh(db_service)
|
|
return db_service
|
|
|
|
|
|
def set_service_usage(
|
|
db: Session, service_uuid: str, usages: List[schemas.ServiceUsageCreate]
|
|
) -> sql_models.Service:
|
|
db_service = get_service_by_uuid(db, service_uuid)
|
|
if db_service is None:
|
|
raise ClanError(f"Service with uuid '{service_uuid}' not found")
|
|
db_usage = []
|
|
for usage in usages:
|
|
db_usage.append(
|
|
sql_models.ServiceUsage(
|
|
times_consumed=usage.times_consumed,
|
|
consumer_entity_did=usage.consumer_entity_did,
|
|
)
|
|
)
|
|
db_service.usage = db_usage
|
|
db.add(db_service)
|
|
db.commit()
|
|
db.refresh(db_service)
|
|
return db_service
|
|
|
|
|
|
def add_service_usage(
|
|
db: Session, service_uuid: str, usage: schemas.ServiceUsageCreate
|
|
) -> sql_models.Service:
|
|
db_service = get_service_by_uuid(db, service_uuid)
|
|
if db_service is None:
|
|
raise ClanError(f"Service with uuid '{service_uuid}' not found")
|
|
db_service.usage.append(
|
|
sql_models.ServiceUsage(
|
|
times_consumed=usage.times_consumed,
|
|
consumer_entity_did=usage.consumer_entity_did,
|
|
)
|
|
)
|
|
db.add(db_service)
|
|
db.commit()
|
|
db.refresh(db_service)
|
|
return db_service
|
|
|
|
|
|
def increment_service_usage(
|
|
db: Session, service_uuid: str, consumer_entity_did: str
|
|
) -> sql_models.ServiceUsage:
|
|
db_service = get_service_by_uuid(db, service_uuid)
|
|
if db_service is None:
|
|
raise ClanError(f"Service with uuid '{service_uuid}' not found")
|
|
# TODO: Make a query for this
|
|
for usage in db_service.usage:
|
|
if usage.consumer_entity_did == consumer_entity_did:
|
|
usage.times_consumed += 1
|
|
break
|
|
db.add(db_service)
|
|
db.commit()
|
|
db.refresh(db_service)
|
|
return db_service
|
|
|
|
|
|
def get_service_by_uuid(db: Session, uuid: str) -> Optional[sql_models.Service]:
|
|
return db.query(sql_models.Service).filter(sql_models.Service.uuid == uuid).first()
|
|
|
|
|
|
def get_services(
|
|
db: Session, skip: int = 0, limit: int = 100
|
|
) -> List[sql_models.Service]:
|
|
return db.query(sql_models.Service).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_services_by_entity_did(
|
|
db: Session, entity_did: str, skip: int = 0, limit: int = 100
|
|
) -> List[sql_models.Service]:
|
|
return (
|
|
db.query(sql_models.Service)
|
|
.filter(sql_models.Service.entity_did == entity_did)
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
|
|
def delete_service_by_entity_did(db: Session, entity_did: str) -> None:
|
|
db.query(sql_models.Service).filter(
|
|
sql_models.Service.entity_did == entity_did
|
|
).delete()
|
|
db.commit()
|
|
|
|
|
|
def get_services_without_entity_id(
|
|
db: Session, entity_did: str, skip: int = 0, limit: int = 100
|
|
) -> List[sql_models.Service]:
|
|
return (
|
|
db.query(sql_models.Service)
|
|
.filter(sql_models.Service.entity_did != entity_did)
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
|
|
#########################
|
|
# #
|
|
# Entity #
|
|
# #
|
|
#########################
|
|
def create_entity(db: Session, entity: schemas.EntityCreate) -> sql_models.Entity:
|
|
if get_entity_by_did(db, entity.did) is not None:
|
|
raise ClanError(f"Entity with did '{entity.did}' already exists")
|
|
if get_entity_by_name_or_did(db, entity.name) is not None:
|
|
raise ClanError(f"Entity with name '{entity.name}' already exists")
|
|
db_entity = sql_models.Entity(
|
|
did=entity.did,
|
|
name=entity.name,
|
|
ip=entity.ip,
|
|
network=entity.network,
|
|
visible=entity.visible,
|
|
other=entity.other,
|
|
attached=False,
|
|
stop_health_task=False,
|
|
)
|
|
|
|
db_roles = []
|
|
for role in entity.roles:
|
|
db_roles.append(sql_models.EntityRoles(role=role))
|
|
|
|
db_entity.roles = db_roles
|
|
db.add(db_entity)
|
|
db.commit()
|
|
db.refresh(db_entity)
|
|
return db_entity
|
|
|
|
|
|
def get_entities(
|
|
db: Session, skip: int = 0, limit: int = 100
|
|
) -> List[sql_models.Entity]:
|
|
return db.query(sql_models.Entity).offset(skip).limit(limit).all()
|
|
|
|
|
|
def get_entity_by_did(db: Session, did: str) -> Optional[sql_models.Entity]:
|
|
return db.query(sql_models.Entity).filter(sql_models.Entity.did == did).first()
|
|
|
|
|
|
def get_entity_by_name_or_did(db: Session, name: str) -> Optional[sql_models.Entity]:
|
|
return (
|
|
db.query(sql_models.Entity)
|
|
.filter((sql_models.Entity.name == name) | (sql_models.Entity.did == name))
|
|
.first()
|
|
)
|
|
|
|
|
|
def get_entity_by_role(
|
|
db: Session, roles: List[schemas.Role]
|
|
) -> List[sql_models.Entity]:
|
|
# create a subquery to count the matching roles for each entity
|
|
subquery = (
|
|
db.query(
|
|
sql_models.EntityRoles.entity_did,
|
|
func.count(sql_models.EntityRoles.role).label("role_count"),
|
|
)
|
|
.filter(sql_models.EntityRoles.role.in_(roles))
|
|
.group_by(sql_models.EntityRoles.entity_did)
|
|
.subquery()
|
|
)
|
|
# join the subquery with the entity table and filter by the role count
|
|
return (
|
|
db.query(sql_models.Entity)
|
|
.join(subquery, sql_models.Entity.did == subquery.c.entity_did)
|
|
.filter(subquery.c.role_count == len(roles))
|
|
.all()
|
|
)
|
|
|
|
|
|
# get attached
|
|
def get_attached_entities(
|
|
db: Session, skip: int = 0, limit: int = 100
|
|
) -> List[sql_models.Entity]:
|
|
return (
|
|
db.query(sql_models.Entity)
|
|
.filter(sql_models.Entity.attached == true())
|
|
# https://stackoverflow.com/questions/18998010/flake8-complains-on-boolean-comparison-in-filter-clause
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
|
|
# Returns same entity if setting didnt changed something
|
|
def set_stop_health_task(
|
|
db: Session, entity_did: str, value: bool
|
|
) -> sql_models.Entity:
|
|
db_entity = get_entity_by_did(db, entity_did)
|
|
if db_entity is None:
|
|
raise ClanError(f"Entity with did '{entity_did}' not found")
|
|
|
|
db_entity.stop_health_task = value # type: ignore
|
|
|
|
# save changes in db
|
|
db.add(db_entity)
|
|
db.commit()
|
|
db.refresh(db_entity)
|
|
return db_entity
|
|
|
|
|
|
def set_attached_by_entity_did(
|
|
db: Session, entity_did: str, attached: bool
|
|
) -> sql_models.Entity:
|
|
db_entity = get_entity_by_did(db, entity_did)
|
|
if db_entity is None:
|
|
raise ClanError(f"Entity with did '{entity_did}' not found")
|
|
|
|
db_entity.attached = attached # type: ignore
|
|
|
|
# save changes in db
|
|
db.add(db_entity)
|
|
db.commit()
|
|
db.refresh(db_entity)
|
|
return db_entity
|
|
|
|
|
|
def delete_entity_by_did(db: Session, did: str) -> None:
|
|
db.query(sql_models.Entity).filter(sql_models.Entity.did == did).delete()
|
|
db.commit()
|
|
|
|
|
|
def delete_entity_by_did_recursive(db: Session, did: str) -> None:
|
|
delete_service_by_entity_did(db, did)
|
|
delete_entity_by_did(db, did)
|
|
|
|
|
|
#########################
|
|
# #
|
|
# Eventmessage #
|
|
# #
|
|
#########################
|
|
|
|
|
|
def create_eventmessage(
|
|
db: Session, eventmsg: schemas.EventmessageCreate
|
|
) -> sql_models.Eventmessage:
|
|
db_eventmessage = sql_models.Eventmessage(
|
|
timestamp=eventmsg.timestamp,
|
|
group=eventmsg.group,
|
|
group_id=eventmsg.group_id,
|
|
msg_type=eventmsg.msg_type,
|
|
src_did=eventmsg.src_did,
|
|
des_did=eventmsg.des_did,
|
|
msg=eventmsg.msg,
|
|
)
|
|
db.add(db_eventmessage)
|
|
db.commit()
|
|
db.refresh(db_eventmessage)
|
|
return db_eventmessage
|
|
|
|
|
|
def get_eventmessages(
|
|
db: Session, skip: int = 0, limit: int = 100
|
|
) -> List[sql_models.Eventmessage]:
|
|
return db.query(sql_models.Eventmessage).offset(skip).limit(limit).all()
|