Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the ability to create identical events #109

Merged
merged 4 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions calendar_backend/routes/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ async def _get_timetable(start: date, end: date, group_id, lecturer_id, room_id,
return GetListEvent(items=events, limit=limit, offset=offset, total=cnt).model_dump(exclude=fmt)


def _is_unique_event(event_dict):
DaymasS marked this conversation as resolved.
Show resolved Hide resolved
existing_events_query = (
Event.get_all(session=db.session)
.filter(Event.name == event_dict["name"])
.filter(Event.start_ts == event_dict["start_ts"])
.filter(Event.end_ts == event_dict["end_ts"])
)
for existing_event in existing_events_query.all():
if (
[column.id for column in existing_event.group] == event_dict["group_id"]
and [column.id for column in existing_event.room] == event_dict["room_id"]
and [column.id for column in existing_event.lecturer] == event_dict["lecturer_id"]
):
return False
return True


@router.get("/", response_model=GetListEvent | None)
async def get_events(
start: date | None = Query(default=None, description="Default: Today"),
Expand All @@ -84,21 +101,22 @@ async def get_events(
return await fmt_cases[format]()


@router.post("/", response_model=EventGet)
@router.post("/", response_model=EventGet | None)
DaymasS marked this conversation as resolved.
Show resolved Hide resolved
async def create_event(event: EventPost, _=Depends(UnionAuth(scopes=["timetable.event.create"]))) -> EventGet:
event_dict = event.model_dump()
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
lecturers = [Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
event_get = Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
)
db.session.commit()
return EventGet.model_validate(event_get)
if _is_unique_event(event_dict):
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
lecturers = [Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
event_get = Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
)
db.session.commit()
return EventGet.model_validate(event_get)


@router.post("/bulk", response_model=list[EventGet])
Expand All @@ -108,18 +126,21 @@ async def create_events(
result = []
for event in events:
event_dict = event.model_dump()
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
lecturers = [Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
result.append(
Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
if _is_unique_event(event_dict):
rooms = [Room.get(room_id, session=db.session) for room_id in event_dict.pop("room_id", [])]
Temmmmmo marked this conversation as resolved.
Show resolved Hide resolved
lecturers = [
Lecturer.get(lecturer_id, session=db.session) for lecturer_id in event_dict.pop("lecturer_id", [])
]
groups = [Group.get(group_id, session=db.session) for group_id in event_dict.pop("group_id", [])]
result.append(
Event.create(
**event_dict,
room=rooms,
lecturer=lecturers,
group=groups,
session=db.session,
)
)
)
db.session.commit()
adapter = TypeAdapter(list[EventGet])
return adapter.validate_python(result)
Expand Down
69 changes: 69 additions & 0 deletions tests/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,75 @@ def test_create_many(client_auth: TestClient, dbsession: Session, room_factory,
assert [row.id for row in response_model.group] == request_obj[1]["group_id"]


def test_create_clone(client_auth: TestClient, dbsession: Session, room_path, group_path, lecturer_path):
room_id = int(room_path.split("/")[-1])
group_id = int(group_path.split("/")[-1])
lecturer_id = int(lecturer_path.split("/")[-1])
time_stamp = datetime.datetime.now()
name = f"name_{time_stamp}"
request_obj = {
"name": name,
"room_id": [room_id],
"group_id": [group_id],
"lecturer_id": [lecturer_id],
"start_ts": "2022-08-26T22:32:38.575Z",
"end_ts": "2022-08-26T22:32:38.575Z",
}
response = client_auth.post(RESOURCE, json=request_obj)
assert response.status_code == status.HTTP_200_OK, response.json()
response = client_auth.post(RESOURCE, json=request_obj)
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
events = dbsession.query(Event).filter(Event.name == name).all()
assert len(events) == 1


def test_create_many_clones(client_auth: TestClient, dbsession: Session, room_factory, group_factory, lecturer_factory):
time_stamp = datetime.datetime.now()
name1 = f"name1_{time_stamp}"
room_path1 = room_factory(client_auth)
group_path1 = group_factory(client_auth)
lecturer_path1 = lecturer_factory(client_auth)
name2 = f"name2_{time_stamp}"
room_path2 = room_factory(client_auth)
group_path2 = group_factory(client_auth)
lecturer_path2 = lecturer_factory(client_auth)
room_id1 = int(room_path1.split("/")[-1])
group_id1 = int(group_path1.split("/")[-1])
lecturer_id1 = int(lecturer_path1.split("/")[-1])
room_id2 = int(room_path2.split("/")[-1])
group_id2 = int(group_path2.split("/")[-1])
lecturer_id2 = int(lecturer_path2.split("/")[-1])
request_obj = [
{
"name": name1,
"room_id": [room_id1],
"group_id": [group_id1],
"lecturer_id": [lecturer_id1],
"start_ts": "2022-08-26T22:32:38.575Z",
"end_ts": "2022-08-26T22:32:38.575Z",
},
{
"name": name2,
"room_id": [room_id2],
"group_id": [group_id2],
"lecturer_id": [lecturer_id2],
"start_ts": "2022-08-26T22:32:38.575Z",
"end_ts": "2022-08-26T22:32:38.575Z",
},
]
response = client_auth.post(f"{RESOURCE}bulk", json=request_obj)
assert response.status_code == status.HTTP_200_OK, response.json()
assert response.json()[0]["name"] == request_obj[0]["name"]
response = client_auth.post(f"{RESOURCE}bulk", json=request_obj)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()) == 0
events = dbsession.query(Event).filter(Event.name == name1).all()
assert len(events) == 1
events = dbsession.query(Event).filter(Event.name == name2).all()
assert len(events) == 1


def test_delete(client_auth: TestClient, dbsession: Session, room_path, lecturer_path, group_path):
room_id = int(room_path.split("/")[-1])
group_id = int(group_path.split("/")[-1])
Expand Down
Loading