Added create_entity
Some checks failed
checks-impure / test (pull_request) Successful in 30s
checks / test (pull_request) Failing after 3h13m7s

This commit is contained in:
2023-11-26 20:54:57 +01:00
parent 80ccaa83d3
commit d1a3a5381a
8 changed files with 200 additions and 128 deletions

View File

@@ -2,37 +2,51 @@ from typing import List
from sqlalchemy.orm import Session
from . import api_outputs, sql_models
from . import schemas, sql_models
def get_producers(
db: Session, skip: int = 0, limit: int = 100
) -> List[sql_models.Producer]:
return db.query(sql_models.Producer).offset(skip).limit(limit).all()
def create_producer(
db: Session, producer: api_outputs.ProducerCreate
) -> sql_models.Producer:
jsonblob_init = {"test_repo": "jsonblob_create"}
db_producer = sql_models.Producer(jsonblob=jsonblob_init)
db.add(db_producer)
def create_entity(db: Session, entity: schemas.EntityCreate) -> sql_models.Entity:
db_entity = sql_models.Entity(**entity.dict())
db.add(db_entity)
db.commit()
db.refresh(db_producer)
return db_producer
db.refresh(db_entity)
return db_entity
def get_repositories(
def get_entities(
db: Session, skip: int = 0, limit: int = 100
) -> List[sql_models.Repository]:
return db.query(sql_models.Repository).offset(skip).limit(limit).all()
) -> List[sql_models.Entity]:
return db.query(sql_models.Entity).offset(skip).limit(limit).all()
def create_repository(
db: Session, repository: api_outputs.RepositoryCreate, producers_id: int
) -> sql_models.Repository:
db_repository = sql_models.Repository(**repository.dict(), prod_id=producers_id)
db.add(db_repository)
db.commit()
db.refresh(db_repository)
return db_repository
# def get_producers(
# db: Session, skip: int = 0, limit: int = 100
# ) -> List[sql_models.Producer]:
# return db.query(sql_models.Producer).offset(skip).limit(limit).all()
# def create_producer(
# db: Session, producer: schemas.ProducerCreate
# ) -> sql_models.Producer:
# jsonblob_init = {"test_repo": "jsonblob_create"}
# db_producer = sql_models.Producer(jsonblob=jsonblob_init)
# db.add(db_producer)
# db.commit()
# db.refresh(db_producer)
# return db_producer
# def get_repositories(
# db: Session, skip: int = 0, limit: int = 100
# ) -> List[sql_models.Repository]:
# return db.query(sql_models.Repository).offset(skip).limit(limit).all()
# def create_repository(
# db: Session, repository: schemas.RepositoryCreate, producers_id: int
# ) -> sql_models.Repository:
# db_repository = sql_models.Repository(**repository.dict(), prod_id=producers_id)
# db.add(db_repository)
# db.commit()
# db.refresh(db_repository)
# return db_repository