import random import uuid from openapi_client import ApiClient from openapi_client.api import DefaultApi from openapi_client.api.entities_api import EntitiesApi from openapi_client.api.services_api import ServicesApi from openapi_client.models import ( Entity, EntityCreate, Machine, ServiceCreate, Status, ) random.seed(42) num_uuids = 100 uuids = [str(uuid.UUID(int=random.getrandbits(128))) for i in range(num_uuids)] def test_health(api_client: ApiClient) -> None: default = DefaultApi(api_client=api_client) res: Machine = default.health() assert res.status == Status.ONLINE def test_entities_empty(api_client: ApiClient) -> None: entity = EntitiesApi(api_client=api_client) res = entity.get_all_entities() assert res == [] def create_entities(num: int = 10) -> list[EntityCreate]: res = [] for i in range(num): en = EntityCreate( did=f"did:sov:test:12{i}", name=f"C{i}", ip=f"127.0.0.1:{7000+i}", visible=True, other={}, ) res.append(en) return res def create_service(idx: int, entity: Entity) -> ServiceCreate: se = ServiceCreate( uuid=uuids[idx], service_name=f"Carlos Printing{idx}", service_type="3D Printing", endpoint_url=f"{entity.ip}/v1/print_daemon{idx}", status="unknown", other={"action": ["register", "deregister", "delete", "create"]}, entity_did=entity.did, ) return se def test_create_entities(api_client: ApiClient) -> None: api = EntitiesApi(api_client=api_client) for own_entity in create_entities(): res: Entity = api.create_entity(own_entity) assert res.did == own_entity.did assert res.attached is False def test_producer2(api: TestClient) -> None: request_body = { "uuid": "8e285c0c-4e40-430a-a477-26b3b81e30d1", "service_name": "Luis'''s Fax", "service_type": "Fax", "endpoint_url": "http://127.0.0.1:8001", "status": "unknown", "other": {"faxen": "dicke"}, "entity_did": default_entity_did2, } paramter = "producer" get_request = "entity_did="+url.quote(default_entity_did2) make_test_post_and_get(api, request_body, paramter, get_request) def test_producer3(api: TestClient) -> None: request_body = { "uuid": "8e285c0c-4e40-430a-a477-26b3b81e30d2", "service_name": "Erdem'''s VR-Stream", "service_type": "VR-Stream", "endpoint_url": "http://127.0.0.1:8002", "status": "unknown", "other": {"oculos": "rift"}, "entity_did": default_entity_did3, } paramter = "producer" get_request = "entity_did="+url.quote(default_entity_did3) make_test_post_and_get(api, request_body, paramter, get_request) def test_producer4(api: TestClient) -> None: request_body = { "uuid": "8e285c0c-4e40-430a-a477-26b3b81e30d3", "service_name": "Onur'''s gallary", "service_type": "gallary", "endpoint_url": "http://127.0.0.1:8003", "status": "unknown", "other": {"nice": "pics"}, "entity_did": default_entity_did4, } paramter = "producer" get_request = "entity_did="+url.quote(default_entity_did4) make_test_post_and_get(api, request_body, paramter, get_request) def test_producer5(api: TestClient) -> None: request_body = { "uuid": "8e285c0c-4e40-430a-a477-26b3b81e30d4", "service_name": "Sara'''s Game-Shop", "service_type": "Game-Shop", "endpoint_url": "http://127.0.0.1:8004", "status": "unknown", "other": {"war": "games"}, "entity_did": default_entity_did5, } paramter = "producer" get_request = "entity_did="+url.quote(default_entity_did5) make_test_post_and_get(api, request_body, paramter, get_request) def test_create_services(api_client: ApiClient) -> None: sapi = ServicesApi(api_client=api_client) eapi = EntitiesApi(api_client=api_client) for midx, entity in enumerate(eapi.get_all_entities()): for idx in range(4): service_obj = create_service(idx + 4 * midx, entity) service = sapi.create_service(service_obj) assert service.uuid == service_obj.uuid