Merge pull request 'push first sql setup try' (#18) from Luis-Hebendanz-georgs into main
All checks were successful
checks-impure / test (push) Successful in 1m14s
assets1 / test (push) Successful in 30s
checks / test (push) Successful in 2m59s

This commit was merged in pull request #18.
This commit is contained in:
2023-11-27 00:00:03 +01:00
11 changed files with 331 additions and 17 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,5 @@
.direnv
sql_app.db
.coverage.*
**/qubeclan
**/testdir
@@ -14,6 +15,7 @@ __pycache__
.coverage
.mypy_cache
.pytest_cache
pkgs.pyproj
.reports
.ruff_cache
htmlcov

View File

@@ -1,14 +0,0 @@
from enum import Enum
from pydantic import BaseModel
class Status(Enum):
ONLINE = "online"
OFFLINE = "offline"
UNKNOWN = "unknown"
class Machine(BaseModel):
name: str
status: Status

View File

@@ -2,15 +2,19 @@ import logging
from contextlib import asynccontextmanager
from typing import Any
# import for sql
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.routing import APIRoute
from fastapi.staticfiles import StaticFiles
from ..errors import ClanError
from . import sql_models
from .assets import asset_path
from .error_handlers import clan_error_handler
from .routers import health, root, socket_manager2
from .routers import health, root, socket_manager2, sql_connect # sql router hinzufügen
from .sql_db import engine
from .tags import tags_metadata
origins = [
"http://localhost:3000",
@@ -27,6 +31,10 @@ async def lifespan(app: FastAPI) -> Any:
def setup_app() -> FastAPI:
# bind sql engine
sql_models.Base.metadata.drop_all(engine)
sql_models.Base.metadata.create_all(bind=engine)
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
@@ -37,6 +45,8 @@ def setup_app() -> FastAPI:
)
app.include_router(health.router)
# sql methodes
app.include_router(sql_connect.router)
app.include_router(socket_manager2.router)
@@ -46,6 +56,9 @@ def setup_app() -> FastAPI:
app.mount("/static", StaticFiles(directory=asset_path()), name="static")
# Add tag descriptions to the OpenAPI schema
app.openapi_tags = tags_metadata
for route in app.routes:
if isinstance(route, APIRoute):
route.operation_id = route.name # in this case, 'read_items'

View File

@@ -1,8 +1,13 @@
from fastapi import APIRouter
from ..schemas import Machine, Status
router = APIRouter()
@router.get("/health", include_in_schema=True)
async def health() -> str:
return "OK"
async def health() -> Machine: # str:
return Machine(name="test", status=Status.ONLINE)
# return "OK"

View File

@@ -0,0 +1,42 @@
from typing import List
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from .. import sql_crud, sql_db, sql_models
from ..schemas import Entity, EntityCreate
from ..tags import Tags
router = APIRouter()
# @router.get("/api/v1/get_producers", response_model=List[Producer], tags=[Tags.producers])
# def get_producers(
# skip: int = 0, limit: int = 100, db: Session = Depends(sql_db.get_db)
# ) -> List[sql_models.Producer]:
# producers = sql_crud.get_producers(db, skip=skip, limit=limit)
# return producers
# @router.post("/api/v1/create_producer", response_model=Producer, tags=[Tags.producers])
# def create_producer(
# producer: ProducerCreate, db: Session = Depends(sql_db.get_db)
# ) -> Producer:
# # todo checken ob schon da ...
# return sql_crud.create_producer(db=db, producer=producer)
@router.post("/api/v1/create_entity", response_model=Entity, tags=[Tags.entities])
def create_entity(
entity: EntityCreate, db: Session = Depends(sql_db.get_db)
) -> EntityCreate:
# todo checken ob schon da ...
return sql_crud.create_entity(db, entity)
@router.get("/api/v1/get_entities", response_model=List[Entity], tags=[Tags.entities])
def get_entities(
skip: int = 0, limit: int = 100, db: Session = Depends(sql_db.get_db)
) -> List[sql_models.Entity]:
entities = sql_crud.get_entities(db, skip=skip, limit=limit)
return entities

View File

@@ -0,0 +1,71 @@
from enum import Enum
from pydantic import BaseModel, Field
class Status(Enum):
ONLINE = "online"
OFFLINE = "offline"
UNKNOWN = "unknown"
class Machine(BaseModel):
name: str
status: Status
class EntityBase(BaseModel):
did: str = "did:sov:test:1234"
name: str = "C1"
ip: str = "127.0.0.1"
attached: bool = False
other: dict = {"test": "test"}
class Entity(EntityBase):
class Config:
orm_mode = True
class EntityCreate(EntityBase):
pass
class RepositoryBase(BaseModel):
title: str
description: str | None = None
class RepositoryCreate(RepositoryBase):
pass
class Repository(RepositoryBase):
id: int
prod_id: str
class Config:
orm_mode = True
class ProducerBase(BaseModel):
id: int
class ProducerCreate(ProducerBase):
jsonblob: int = Field(
42,
title="The Json",
description="this is the value of json",
gt=30,
lt=50,
list=[1, 2, "3"],
)
class Producer(ProducerBase):
id: int
repos: list[Repository] = []
class Config:
orm_mode = True

Binary file not shown.

View File

@@ -0,0 +1,52 @@
from typing import List
from sqlalchemy.orm import Session
from . import schemas, sql_models
def create_entity(db: Session, entity: schemas.EntityCreate) -> sql_models.Entity:
db_entity = sql_models.Entity(**entity.dict())
db.add(db_entity)
db.commit()
db.refresh(db_entity)
return db_entity
def get_entities(
db: Session, skip: int = 0, limit: int = 100
) -> List[sql_models.Entity]:
return db.query(sql_models.Entity).offset(skip).limit(limit).all()
# def get_producers(
# db: Session, skip: int = 0, limit: int = 100
# ) -> List[sql_models.Producer]:
# return db.query(sql_models.Producer).offset(skip).limit(limit).all()
# def create_producer(
# db: Session, producer: schemas.ProducerCreate
# ) -> sql_models.Producer:
# jsonblob_init = {"test_repo": "jsonblob_create"}
# db_producer = sql_models.Producer(jsonblob=jsonblob_init)
# db.add(db_producer)
# db.commit()
# db.refresh(db_producer)
# return db_producer
# def get_repositories(
# db: Session, skip: int = 0, limit: int = 100
# ) -> List[sql_models.Repository]:
# return db.query(sql_models.Repository).offset(skip).limit(limit).all()
# def create_repository(
# db: Session, repository: schemas.RepositoryCreate, producers_id: int
# ) -> sql_models.Repository:
# db_repository = sql_models.Repository(**repository.dict(), prod_id=producers_id)
# db.add(db_repository)
# db.commit()
# db.refresh(db_repository)
# return db_repository

View File

@@ -0,0 +1,21 @@
from typing import Generator
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, sessionmaker
URL = "sqlite:///./sql_app.db"
engine = create_engine(URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency to start a clean thread of the db
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@@ -0,0 +1,90 @@
from sqlalchemy import JSON, Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .sql_db import Base
# Relationsship example
# https://dev.to/freddiemazzilli/flask-sqlalchemy-relationships-exploring-relationship-associations-igo
class Entity(Base):
__tablename__ = "entities"
## Queryable body ##
did = Column(String, primary_key=True, index=True)
name = Column(String, index=True)
ip = Column(String, index=True)
attached = Column(Boolean, index=True)
## Non queryable body ##
# In here we deposit: Network, Roles, Visible, etc.
other = Column(JSON)
## Relations ##
producers = relationship("Producer", back_populates="entity")
consumers = relationship("Consumer", back_populates="entity")
# repository = relationship("Repository", uselist=False, back_populates="entity")
class ProducerAbstract(Base):
__abstract__ = True
# Queryable body
id = Column(Integer, primary_key=True, index=True)
service_name = Column(String, unique=True, index=True)
service_type = Column(String, index=True)
endpoint_url = Column(String, index=True)
status = Column(String, index=True)
## Non queryable body ##
# In here we deposit: Action
other = Column(JSON)
class Producer(ProducerAbstract):
__tablename__ = "producers"
# Usage is the consumers column
## Relations ##
# One entity can have many producers
entity = relationship("Entity", back_populates="producers")
entity_did = Column(Integer, ForeignKey("entities.did"))
# One producer has many consumers
consumers = relationship("Consumer", back_populates="producer")
class Consumer(Base):
__tablename__ = "consumers"
## Queryable body ##
id = Column(Integer, primary_key=True, index=True)
## Relations ##
# one entity can have many consumers
entity = relationship("Entity", back_populates="consumers")
entity_did = Column(Integer, ForeignKey("entities.did"))
# one consumer has one producer
producer = relationship("Producer", back_populates="consumers")
producer_id = Column(Integer, ForeignKey("producers.id"))
# class Repository(ProducerAbstract):
# __tablename__ = "repositories"
# # one repository has one entity
# entity = relationship("Entity", back_populates="repository")
# entity_did = Column(Integer, ForeignKey("entities.did"))
# TODO: Ask how this works exactly
class Resolution(Base):
__tablename__ = "resolutions"
id = Column(Integer, primary_key=True)
requester_name = Column(String, index=True)
requester_did = Column(String, index=True)
resolved_did = Column(String, index=True)
timestamp = Column(DateTime, index=True)

View File

@@ -0,0 +1,32 @@
from enum import Enum
from typing import Any, Dict, List
class Tags(Enum):
producers = "producers"
consumers = "consumers"
entities = "entities"
repositories = "repositories"
def __str__(self) -> str:
return self.value
tags_metadata: List[Dict[str, Any]] = [
{
"name": str(Tags.producers),
"description": "Operations on a producer.",
},
{
"name": str(Tags.consumers),
"description": "Operations on a consumer.",
},
{
"name": str(Tags.entities),
"description": "Operations on an entity.",
},
{
"name": str(Tags.repositories),
"description": "Operations on a repository.",
},
]