add event msg with populate and the right table in swagger

This commit is contained in:
Georg-Stahn
2024-01-08 20:19:07 +01:00
parent 5bd84b8532
commit 8fba3e6a5e
18 changed files with 1079 additions and 8 deletions

View File

@@ -1,13 +1,17 @@
import random
import time
import uuid
from openapi_client import ApiClient
from openapi_client.api import DefaultApi
from openapi_client.api.entities_api import EntitiesApi
from openapi_client.api.eventmessages_api import EventmessagesApi
from openapi_client.api.services_api import ServicesApi
from openapi_client.models import (
Entity,
EntityCreate,
Eventmessage,
EventmessageCreate,
Machine,
ServiceCreate,
Status,
@@ -70,3 +74,69 @@ def test_create_services(api_client: ApiClient) -> None:
service_obj = create_service(idx + 4 * midx, entity)
service = sapi.create_service(service_obj)
assert service.uuid == service_obj.uuid
random.seed(77)
def create_eventmessages(num: int = 2) -> list[EventmessageCreate]:
res = []
starttime = int(time.time())
for i in range(num):
group_id = i % 5 + random.getrandbits(6)
em_req_send = EventmessageCreate(
id=random.getrandbits(18),
timestamp=starttime + i * 10,
group=i % 5,
group_id=group_id,
msg_type=1,
src_did=f"did:sov:test:12{i}",
des_did=f"did:sov:test:12{i+1}",
msg={},
)
res.append(em_req_send)
em_req_rec = EventmessageCreate(
id=random.getrandbits(18),
timestamp=starttime + (i * 10) + 2,
group=i % 5,
group_id=group_id,
msg_type=2,
src_did=f"did:sov:test:12{i}",
des_did=f"did:sov:test:12{i+1}",
msg={},
)
res.append(em_req_rec)
group_id = i % 5 + random.getrandbits(6)
em_res_send = EventmessageCreate(
id=random.getrandbits(18),
timestamp=starttime + i * 10 + 4,
group=i % 5,
group_id=group_id,
msg_type=3,
src_did=f"did:sov:test:12{i+1}",
des_did=f"did:sov:test:12{i}",
msg={},
)
res.append(em_res_send)
em_res_rec = EventmessageCreate(
id=random.getrandbits(6),
timestamp=starttime + (i * 10) + 8,
group=i % 5,
group_id=group_id,
msg_type=4,
src_did=f"did:sov:test:12{i+1}",
des_did=f"did:sov:test:12{i}",
msg={},
)
res.append(em_res_rec)
return res
def test_create_eventmessages(api_client: ApiClient) -> None:
api = EventmessagesApi(api_client=api_client)
assert [] == api.get_all_eventmessages()
for own_eventmsg in create_eventmessages():
res: Eventmessage = api.create_eventmessage(own_eventmsg)
# breakpoint()
assert res.id == own_eventmsg.id
assert [] != api.get_all_eventmessages()